1mod context;
16mod default_serializer;
17pub mod options;
18mod state;
19use std::any::Any;
20use std::sync::Arc;
21
22use async_trait::async_trait;
23use catalog::CatalogManagerRef;
24use common_base::Plugins;
25use common_function::function_factory::ScalarFunctionFactory;
26use common_function::function_registry::FUNCTION_REGISTRY;
27use common_function::handlers::{
28 FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
29};
30use common_query::Output;
31use datafusion::catalog::TableFunction;
32use datafusion::dataframe::DataFrame;
33use datafusion_expr::{AggregateUDF, LogicalPlan, WindowUDF};
34use datatypes::schema::Schema;
35pub use default_serializer::{DefaultPlanDecoder, DefaultSerializer};
36use partition::manager::PartitionRuleManagerRef;
37use session::context::QueryContextRef;
38use table::TableRef;
39
40use crate::datafusion::DatafusionQueryEngine;
41use crate::error::Result;
42use crate::options::QueryOptions;
43use crate::planner::LogicalPlanner;
44pub use crate::query_engine::context::QueryEngineContext;
45pub use crate::query_engine::state::QueryEngineState;
46use crate::region_query::RegionQueryHandlerRef;
47
48#[derive(Debug)]
50pub struct DescribeResult {
51 pub schema: Schema,
53 pub logical_plan: LogicalPlan,
55}
56
57#[async_trait]
58pub trait QueryEngine: Send + Sync {
59 fn as_any(&self) -> &dyn Any;
62
63 fn planner(&self) -> Arc<dyn LogicalPlanner>;
65
66 fn name(&self) -> &str;
68
69 async fn describe(
71 &self,
72 plan: LogicalPlan,
73 query_ctx: QueryContextRef,
74 ) -> Result<DescribeResult>;
75
76 async fn execute(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output>;
78
79 fn register_aggregate_function(&self, func: AggregateUDF);
84
85 fn register_scalar_function(&self, func: ScalarFunctionFactory);
88
89 fn register_table_function(&self, func: Arc<TableFunction>);
91
92 fn register_window_function(&self, func: WindowUDF);
94
95 fn read_table(&self, table: TableRef) -> Result<DataFrame>;
97
98 fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext;
100
101 fn engine_state(&self) -> &QueryEngineState;
103}
104
105pub struct QueryEngineFactory {
106 query_engine: Arc<dyn QueryEngine>,
107}
108
109impl QueryEngineFactory {
110 pub fn new(
111 catalog_manager: CatalogManagerRef,
112 region_query_handler: Option<RegionQueryHandlerRef>,
113 table_mutation_handler: Option<TableMutationHandlerRef>,
114 procedure_service_handler: Option<ProcedureServiceHandlerRef>,
115 flow_service_handler: Option<FlowServiceHandlerRef>,
116 with_dist_planner: bool,
117 options: QueryOptions,
118 ) -> Self {
119 Self::new_with_plugins(
120 catalog_manager,
121 None,
122 region_query_handler,
123 table_mutation_handler,
124 procedure_service_handler,
125 flow_service_handler,
126 with_dist_planner,
127 Default::default(),
128 options,
129 )
130 }
131
132 #[allow(clippy::too_many_arguments)]
133 pub fn new_with_plugins(
134 catalog_manager: CatalogManagerRef,
135 partition_rule_manager: Option<PartitionRuleManagerRef>,
136 region_query_handler: Option<RegionQueryHandlerRef>,
137 table_mutation_handler: Option<TableMutationHandlerRef>,
138 procedure_service_handler: Option<ProcedureServiceHandlerRef>,
139 flow_service_handler: Option<FlowServiceHandlerRef>,
140 with_dist_planner: bool,
141 plugins: Plugins,
142 options: QueryOptions,
143 ) -> Self {
144 let state = Arc::new(QueryEngineState::new(
145 catalog_manager,
146 partition_rule_manager,
147 region_query_handler,
148 table_mutation_handler,
149 procedure_service_handler,
150 flow_service_handler,
151 with_dist_planner,
152 plugins.clone(),
153 options,
154 ));
155 let query_engine = Arc::new(DatafusionQueryEngine::new(state, plugins));
156 register_functions(&query_engine);
157 Self { query_engine }
158 }
159
160 pub fn query_engine(&self) -> QueryEngineRef {
161 self.query_engine.clone()
162 }
163}
164
165fn register_functions(query_engine: &Arc<DatafusionQueryEngine>) {
167 for func in FUNCTION_REGISTRY.scalar_functions() {
168 query_engine.register_scalar_function(func);
169 }
170
171 for accumulator in FUNCTION_REGISTRY.aggregate_functions() {
172 query_engine.register_aggregate_function(accumulator);
173 }
174
175 for table_function in FUNCTION_REGISTRY.table_functions() {
176 query_engine.register_table_function(table_function);
177 }
178
179 for window_function in FUNCTION_REGISTRY.window_functions() {
180 query_engine.register_window_function(window_function);
181 }
182}
183
184pub type QueryEngineRef = Arc<dyn QueryEngine>;
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189
190 #[test]
191 fn test_query_engine_factory() {
192 let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap();
193 let factory = QueryEngineFactory::new(
194 catalog_list,
195 None,
196 None,
197 None,
198 None,
199 false,
200 QueryOptions::default(),
201 );
202
203 let engine = factory.query_engine();
204
205 assert_eq!("datafusion", engine.name());
206 }
207}