1mod context;
16mod default_serializer;
17pub mod options;
18mod state;
19use std::any::Any;
20use std::sync::Arc;
21
22use async_trait::async_trait;
23use catalog::CatalogManagerRef;
24use common_base::Plugins;
25use common_function::function_factory::ScalarFunctionFactory;
26use common_function::function_registry::FUNCTION_REGISTRY;
27use common_function::handlers::{
28 FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
29};
30use common_query::Output;
31use datafusion_expr::{AggregateUDF, LogicalPlan};
32use datatypes::schema::Schema;
33pub use default_serializer::{DefaultPlanDecoder, DefaultSerializer};
34use session::context::QueryContextRef;
35use table::TableRef;
36
37use crate::dataframe::DataFrame;
38use crate::datafusion::DatafusionQueryEngine;
39use crate::error::Result;
40use crate::options::QueryOptions;
41use crate::planner::LogicalPlanner;
42pub use crate::query_engine::context::QueryEngineContext;
43pub use crate::query_engine::state::QueryEngineState;
44use crate::region_query::RegionQueryHandlerRef;
45
46#[derive(Debug)]
48pub struct DescribeResult {
49 pub schema: Schema,
51 pub logical_plan: LogicalPlan,
53}
54
55#[async_trait]
56pub trait QueryEngine: Send + Sync {
57 fn as_any(&self) -> &dyn Any;
60
61 fn planner(&self) -> Arc<dyn LogicalPlanner>;
63
64 fn name(&self) -> &str;
66
67 async fn describe(
69 &self,
70 plan: LogicalPlan,
71 query_ctx: QueryContextRef,
72 ) -> Result<DescribeResult>;
73
74 async fn execute(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output>;
76
77 fn register_aggregate_function(&self, func: AggregateUDF);
82
83 fn register_scalar_function(&self, func: ScalarFunctionFactory);
86
87 fn read_table(&self, table: TableRef) -> Result<DataFrame>;
89
90 fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext;
92
93 fn engine_state(&self) -> &QueryEngineState;
95}
96
97pub struct QueryEngineFactory {
98 query_engine: Arc<dyn QueryEngine>,
99}
100
101impl QueryEngineFactory {
102 pub fn new(
103 catalog_manager: CatalogManagerRef,
104 region_query_handler: Option<RegionQueryHandlerRef>,
105 table_mutation_handler: Option<TableMutationHandlerRef>,
106 procedure_service_handler: Option<ProcedureServiceHandlerRef>,
107 flow_service_handler: Option<FlowServiceHandlerRef>,
108 with_dist_planner: bool,
109 options: QueryOptions,
110 ) -> Self {
111 Self::new_with_plugins(
112 catalog_manager,
113 region_query_handler,
114 table_mutation_handler,
115 procedure_service_handler,
116 flow_service_handler,
117 with_dist_planner,
118 Default::default(),
119 options,
120 )
121 }
122
123 #[allow(clippy::too_many_arguments)]
124 pub fn new_with_plugins(
125 catalog_manager: CatalogManagerRef,
126 region_query_handler: Option<RegionQueryHandlerRef>,
127 table_mutation_handler: Option<TableMutationHandlerRef>,
128 procedure_service_handler: Option<ProcedureServiceHandlerRef>,
129 flow_service_handler: Option<FlowServiceHandlerRef>,
130 with_dist_planner: bool,
131 plugins: Plugins,
132 options: QueryOptions,
133 ) -> Self {
134 let state = Arc::new(QueryEngineState::new(
135 catalog_manager,
136 region_query_handler,
137 table_mutation_handler,
138 procedure_service_handler,
139 flow_service_handler,
140 with_dist_planner,
141 plugins.clone(),
142 options,
143 ));
144 let query_engine = Arc::new(DatafusionQueryEngine::new(state, plugins));
145 register_functions(&query_engine);
146 Self { query_engine }
147 }
148
149 pub fn query_engine(&self) -> QueryEngineRef {
150 self.query_engine.clone()
151 }
152}
153
154fn register_functions(query_engine: &Arc<DatafusionQueryEngine>) {
156 for func in FUNCTION_REGISTRY.scalar_functions() {
157 query_engine.register_scalar_function(func);
158 }
159
160 for accumulator in FUNCTION_REGISTRY.aggregate_functions() {
161 query_engine.register_aggregate_function(accumulator);
162 }
163}
164
165pub type QueryEngineRef = Arc<dyn QueryEngine>;
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170
171 #[test]
172 fn test_query_engine_factory() {
173 let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap();
174 let factory = QueryEngineFactory::new(
175 catalog_list,
176 None,
177 None,
178 None,
179 None,
180 false,
181 QueryOptions::default(),
182 );
183
184 let engine = factory.query_engine();
185
186 assert_eq!("datafusion", engine.name());
187 }
188}