query/
query_engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod context;
16mod default_serializer;
17pub mod options;
18mod state;
19use std::any::Any;
20use std::sync::Arc;
21
22use async_trait::async_trait;
23use catalog::CatalogManagerRef;
24use common_base::Plugins;
25use common_function::function_factory::ScalarFunctionFactory;
26use common_function::function_registry::FUNCTION_REGISTRY;
27use common_function::handlers::{
28    FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
29};
30use common_query::Output;
31use datafusion_expr::{AggregateUDF, LogicalPlan};
32use datatypes::schema::Schema;
33pub use default_serializer::{DefaultPlanDecoder, DefaultSerializer};
34use session::context::QueryContextRef;
35use table::TableRef;
36
37use crate::dataframe::DataFrame;
38use crate::datafusion::DatafusionQueryEngine;
39use crate::error::Result;
40use crate::options::QueryOptions;
41use crate::planner::LogicalPlanner;
42pub use crate::query_engine::context::QueryEngineContext;
43pub use crate::query_engine::state::QueryEngineState;
44use crate::region_query::RegionQueryHandlerRef;
45
46/// Describe statement result
47#[derive(Debug)]
48pub struct DescribeResult {
49    /// The schema of statement
50    pub schema: Schema,
51    /// The logical plan for statement
52    pub logical_plan: LogicalPlan,
53}
54
55#[async_trait]
56pub trait QueryEngine: Send + Sync {
57    /// Returns the query engine as Any
58    /// so that it can be downcast to a specific implementation.
59    fn as_any(&self) -> &dyn Any;
60
61    /// Returns the logical planner
62    fn planner(&self) -> Arc<dyn LogicalPlanner>;
63
64    /// Returns the query engine name.
65    fn name(&self) -> &str;
66
67    /// Describe the given [`LogicalPlan`].
68    async fn describe(
69        &self,
70        plan: LogicalPlan,
71        query_ctx: QueryContextRef,
72    ) -> Result<DescribeResult>;
73
74    /// Execute the given [`LogicalPlan`].
75    async fn execute(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output>;
76
77    /// Register an aggregate function.
78    ///
79    /// # Panics
80    /// Will panic if the function with same name is already registered.
81    fn register_aggregate_function(&self, func: AggregateUDF);
82
83    /// Register a scalar function.
84    /// Will override if the function with same name is already registered.
85    fn register_scalar_function(&self, func: ScalarFunctionFactory);
86
87    /// Create a DataFrame from a table.
88    fn read_table(&self, table: TableRef) -> Result<DataFrame>;
89
90    /// Create a [`QueryEngineContext`].
91    fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext;
92
93    /// Retrieve the query engine state [`QueryEngineState`]
94    fn engine_state(&self) -> &QueryEngineState;
95}
96
97pub struct QueryEngineFactory {
98    query_engine: Arc<dyn QueryEngine>,
99}
100
101impl QueryEngineFactory {
102    pub fn new(
103        catalog_manager: CatalogManagerRef,
104        region_query_handler: Option<RegionQueryHandlerRef>,
105        table_mutation_handler: Option<TableMutationHandlerRef>,
106        procedure_service_handler: Option<ProcedureServiceHandlerRef>,
107        flow_service_handler: Option<FlowServiceHandlerRef>,
108        with_dist_planner: bool,
109        options: QueryOptions,
110    ) -> Self {
111        Self::new_with_plugins(
112            catalog_manager,
113            region_query_handler,
114            table_mutation_handler,
115            procedure_service_handler,
116            flow_service_handler,
117            with_dist_planner,
118            Default::default(),
119            options,
120        )
121    }
122
123    #[allow(clippy::too_many_arguments)]
124    pub fn new_with_plugins(
125        catalog_manager: CatalogManagerRef,
126        region_query_handler: Option<RegionQueryHandlerRef>,
127        table_mutation_handler: Option<TableMutationHandlerRef>,
128        procedure_service_handler: Option<ProcedureServiceHandlerRef>,
129        flow_service_handler: Option<FlowServiceHandlerRef>,
130        with_dist_planner: bool,
131        plugins: Plugins,
132        options: QueryOptions,
133    ) -> Self {
134        let state = Arc::new(QueryEngineState::new(
135            catalog_manager,
136            region_query_handler,
137            table_mutation_handler,
138            procedure_service_handler,
139            flow_service_handler,
140            with_dist_planner,
141            plugins.clone(),
142            options,
143        ));
144        let query_engine = Arc::new(DatafusionQueryEngine::new(state, plugins));
145        register_functions(&query_engine);
146        Self { query_engine }
147    }
148
149    pub fn query_engine(&self) -> QueryEngineRef {
150        self.query_engine.clone()
151    }
152}
153
154/// Register all functions implemented by GreptimeDB
155fn register_functions(query_engine: &Arc<DatafusionQueryEngine>) {
156    for func in FUNCTION_REGISTRY.scalar_functions() {
157        query_engine.register_scalar_function(func);
158    }
159
160    for accumulator in FUNCTION_REGISTRY.aggregate_functions() {
161        query_engine.register_aggregate_function(accumulator);
162    }
163}
164
165pub type QueryEngineRef = Arc<dyn QueryEngine>;
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170
171    #[test]
172    fn test_query_engine_factory() {
173        let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap();
174        let factory = QueryEngineFactory::new(
175            catalog_list,
176            None,
177            None,
178            None,
179            None,
180            false,
181            QueryOptions::default(),
182        );
183
184        let engine = factory.query_engine();
185
186        assert_eq!("datafusion", engine.name());
187    }
188}