1mod context;
16mod default_serializer;
17pub mod options;
18mod state;
19use std::any::Any;
20use std::sync::Arc;
21
22use async_trait::async_trait;
23use catalog::CatalogManagerRef;
24use common_base::Plugins;
25use common_function::function_factory::ScalarFunctionFactory;
26use common_function::function_registry::FUNCTION_REGISTRY;
27use common_function::handlers::{
28 FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
29};
30use common_query::Output;
31use datafusion_expr::{AggregateUDF, LogicalPlan};
32use datatypes::schema::Schema;
33pub use default_serializer::{DefaultPlanDecoder, DefaultSerializer};
34use partition::manager::PartitionRuleManagerRef;
35use session::context::QueryContextRef;
36use table::TableRef;
37
38use crate::dataframe::DataFrame;
39use crate::datafusion::DatafusionQueryEngine;
40use crate::error::Result;
41use crate::options::QueryOptions;
42use crate::planner::LogicalPlanner;
43pub use crate::query_engine::context::QueryEngineContext;
44pub use crate::query_engine::state::QueryEngineState;
45use crate::region_query::RegionQueryHandlerRef;
46
47#[derive(Debug)]
49pub struct DescribeResult {
50 pub schema: Schema,
52 pub logical_plan: LogicalPlan,
54}
55
56#[async_trait]
57pub trait QueryEngine: Send + Sync {
58 fn as_any(&self) -> &dyn Any;
61
62 fn planner(&self) -> Arc<dyn LogicalPlanner>;
64
65 fn name(&self) -> &str;
67
68 async fn describe(
70 &self,
71 plan: LogicalPlan,
72 query_ctx: QueryContextRef,
73 ) -> Result<DescribeResult>;
74
75 async fn execute(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output>;
77
78 fn register_aggregate_function(&self, func: AggregateUDF);
83
84 fn register_scalar_function(&self, func: ScalarFunctionFactory);
87
88 fn read_table(&self, table: TableRef) -> Result<DataFrame>;
90
91 fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext;
93
94 fn engine_state(&self) -> &QueryEngineState;
96}
97
98pub struct QueryEngineFactory {
99 query_engine: Arc<dyn QueryEngine>,
100}
101
102impl QueryEngineFactory {
103 pub fn new(
104 catalog_manager: CatalogManagerRef,
105 region_query_handler: Option<RegionQueryHandlerRef>,
106 table_mutation_handler: Option<TableMutationHandlerRef>,
107 procedure_service_handler: Option<ProcedureServiceHandlerRef>,
108 flow_service_handler: Option<FlowServiceHandlerRef>,
109 with_dist_planner: bool,
110 options: QueryOptions,
111 ) -> Self {
112 Self::new_with_plugins(
113 catalog_manager,
114 None,
115 region_query_handler,
116 table_mutation_handler,
117 procedure_service_handler,
118 flow_service_handler,
119 with_dist_planner,
120 Default::default(),
121 options,
122 )
123 }
124
125 #[allow(clippy::too_many_arguments)]
126 pub fn new_with_plugins(
127 catalog_manager: CatalogManagerRef,
128 partition_rule_manager: Option<PartitionRuleManagerRef>,
129 region_query_handler: Option<RegionQueryHandlerRef>,
130 table_mutation_handler: Option<TableMutationHandlerRef>,
131 procedure_service_handler: Option<ProcedureServiceHandlerRef>,
132 flow_service_handler: Option<FlowServiceHandlerRef>,
133 with_dist_planner: bool,
134 plugins: Plugins,
135 options: QueryOptions,
136 ) -> Self {
137 let state = Arc::new(QueryEngineState::new(
138 catalog_manager,
139 partition_rule_manager,
140 region_query_handler,
141 table_mutation_handler,
142 procedure_service_handler,
143 flow_service_handler,
144 with_dist_planner,
145 plugins.clone(),
146 options,
147 ));
148 let query_engine = Arc::new(DatafusionQueryEngine::new(state, plugins));
149 register_functions(&query_engine);
150 Self { query_engine }
151 }
152
153 pub fn query_engine(&self) -> QueryEngineRef {
154 self.query_engine.clone()
155 }
156}
157
158fn register_functions(query_engine: &Arc<DatafusionQueryEngine>) {
160 for func in FUNCTION_REGISTRY.scalar_functions() {
161 query_engine.register_scalar_function(func);
162 }
163
164 for accumulator in FUNCTION_REGISTRY.aggregate_functions() {
165 query_engine.register_aggregate_function(accumulator);
166 }
167}
168
169pub type QueryEngineRef = Arc<dyn QueryEngine>;
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174
175 #[test]
176 fn test_query_engine_factory() {
177 let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap();
178 let factory = QueryEngineFactory::new(
179 catalog_list,
180 None,
181 None,
182 None,
183 None,
184 false,
185 QueryOptions::default(),
186 );
187
188 let engine = factory.query_engine();
189
190 assert_eq!("datafusion", engine.name());
191 }
192}