query/
query_engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod context;
16mod default_serializer;
17pub mod options;
18mod state;
19use std::any::Any;
20use std::sync::Arc;
21
22use async_trait::async_trait;
23use catalog::CatalogManagerRef;
24use common_base::Plugins;
25use common_function::function_factory::ScalarFunctionFactory;
26use common_function::function_registry::FUNCTION_REGISTRY;
27use common_function::handlers::{
28    FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
29};
30use common_query::Output;
31use datafusion_expr::{AggregateUDF, LogicalPlan};
32use datatypes::schema::Schema;
33pub use default_serializer::{DefaultPlanDecoder, DefaultSerializer};
34use partition::manager::PartitionRuleManagerRef;
35use session::context::QueryContextRef;
36use table::TableRef;
37
38use crate::dataframe::DataFrame;
39use crate::datafusion::DatafusionQueryEngine;
40use crate::error::Result;
41use crate::options::QueryOptions;
42use crate::planner::LogicalPlanner;
43pub use crate::query_engine::context::QueryEngineContext;
44pub use crate::query_engine::state::QueryEngineState;
45use crate::region_query::RegionQueryHandlerRef;
46
47/// Describe statement result
48#[derive(Debug)]
49pub struct DescribeResult {
50    /// The schema of statement
51    pub schema: Schema,
52    /// The logical plan for statement
53    pub logical_plan: LogicalPlan,
54}
55
56#[async_trait]
57pub trait QueryEngine: Send + Sync {
58    /// Returns the query engine as Any
59    /// so that it can be downcast to a specific implementation.
60    fn as_any(&self) -> &dyn Any;
61
62    /// Returns the logical planner
63    fn planner(&self) -> Arc<dyn LogicalPlanner>;
64
65    /// Returns the query engine name.
66    fn name(&self) -> &str;
67
68    /// Describe the given [`LogicalPlan`].
69    async fn describe(
70        &self,
71        plan: LogicalPlan,
72        query_ctx: QueryContextRef,
73    ) -> Result<DescribeResult>;
74
75    /// Execute the given [`LogicalPlan`].
76    async fn execute(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output>;
77
78    /// Register an aggregate function.
79    ///
80    /// # Panics
81    /// Will panic if the function with same name is already registered.
82    fn register_aggregate_function(&self, func: AggregateUDF);
83
84    /// Register a scalar function.
85    /// Will override if the function with same name is already registered.
86    fn register_scalar_function(&self, func: ScalarFunctionFactory);
87
88    /// Create a DataFrame from a table.
89    fn read_table(&self, table: TableRef) -> Result<DataFrame>;
90
91    /// Create a [`QueryEngineContext`].
92    fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext;
93
94    /// Retrieve the query engine state [`QueryEngineState`]
95    fn engine_state(&self) -> &QueryEngineState;
96}
97
98pub struct QueryEngineFactory {
99    query_engine: Arc<dyn QueryEngine>,
100}
101
102impl QueryEngineFactory {
103    pub fn new(
104        catalog_manager: CatalogManagerRef,
105        region_query_handler: Option<RegionQueryHandlerRef>,
106        table_mutation_handler: Option<TableMutationHandlerRef>,
107        procedure_service_handler: Option<ProcedureServiceHandlerRef>,
108        flow_service_handler: Option<FlowServiceHandlerRef>,
109        with_dist_planner: bool,
110        options: QueryOptions,
111    ) -> Self {
112        Self::new_with_plugins(
113            catalog_manager,
114            None,
115            region_query_handler,
116            table_mutation_handler,
117            procedure_service_handler,
118            flow_service_handler,
119            with_dist_planner,
120            Default::default(),
121            options,
122        )
123    }
124
125    #[allow(clippy::too_many_arguments)]
126    pub fn new_with_plugins(
127        catalog_manager: CatalogManagerRef,
128        partition_rule_manager: Option<PartitionRuleManagerRef>,
129        region_query_handler: Option<RegionQueryHandlerRef>,
130        table_mutation_handler: Option<TableMutationHandlerRef>,
131        procedure_service_handler: Option<ProcedureServiceHandlerRef>,
132        flow_service_handler: Option<FlowServiceHandlerRef>,
133        with_dist_planner: bool,
134        plugins: Plugins,
135        options: QueryOptions,
136    ) -> Self {
137        let state = Arc::new(QueryEngineState::new(
138            catalog_manager,
139            partition_rule_manager,
140            region_query_handler,
141            table_mutation_handler,
142            procedure_service_handler,
143            flow_service_handler,
144            with_dist_planner,
145            plugins.clone(),
146            options,
147        ));
148        let query_engine = Arc::new(DatafusionQueryEngine::new(state, plugins));
149        register_functions(&query_engine);
150        Self { query_engine }
151    }
152
153    pub fn query_engine(&self) -> QueryEngineRef {
154        self.query_engine.clone()
155    }
156}
157
158/// Register all functions implemented by GreptimeDB
159fn register_functions(query_engine: &Arc<DatafusionQueryEngine>) {
160    for func in FUNCTION_REGISTRY.scalar_functions() {
161        query_engine.register_scalar_function(func);
162    }
163
164    for accumulator in FUNCTION_REGISTRY.aggregate_functions() {
165        query_engine.register_aggregate_function(accumulator);
166    }
167}
168
169pub type QueryEngineRef = Arc<dyn QueryEngine>;
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174
175    #[test]
176    fn test_query_engine_factory() {
177        let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap();
178        let factory = QueryEngineFactory::new(
179            catalog_list,
180            None,
181            None,
182            None,
183            None,
184            false,
185            QueryOptions::default(),
186        );
187
188        let engine = factory.query_engine();
189
190        assert_eq!("datafusion", engine.name());
191    }
192}