1mod context;
16mod default_serializer;
17pub mod options;
18mod state;
19use std::any::Any;
20use std::sync::Arc;
21
22use async_trait::async_trait;
23use catalog::CatalogManagerRef;
24use common_base::Plugins;
25use common_function::function_factory::ScalarFunctionFactory;
26use common_function::function_registry::FUNCTION_REGISTRY;
27use common_function::handlers::{
28 FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
29};
30use common_query::Output;
31use datafusion::catalog::TableFunction;
32use datafusion_expr::{AggregateUDF, LogicalPlan};
33use datatypes::schema::Schema;
34pub use default_serializer::{DefaultPlanDecoder, DefaultSerializer};
35use partition::manager::PartitionRuleManagerRef;
36use session::context::QueryContextRef;
37use table::TableRef;
38
39use crate::dataframe::DataFrame;
40use crate::datafusion::DatafusionQueryEngine;
41use crate::error::Result;
42use crate::options::QueryOptions;
43use crate::planner::LogicalPlanner;
44pub use crate::query_engine::context::QueryEngineContext;
45pub use crate::query_engine::state::QueryEngineState;
46use crate::region_query::RegionQueryHandlerRef;
47
48#[derive(Debug)]
50pub struct DescribeResult {
51 pub schema: Schema,
53 pub logical_plan: LogicalPlan,
55}
56
57#[async_trait]
58pub trait QueryEngine: Send + Sync {
59 fn as_any(&self) -> &dyn Any;
62
63 fn planner(&self) -> Arc<dyn LogicalPlanner>;
65
66 fn name(&self) -> &str;
68
69 async fn describe(
71 &self,
72 plan: LogicalPlan,
73 query_ctx: QueryContextRef,
74 ) -> Result<DescribeResult>;
75
76 async fn execute(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output>;
78
79 fn register_aggregate_function(&self, func: AggregateUDF);
84
85 fn register_scalar_function(&self, func: ScalarFunctionFactory);
88
89 fn register_table_function(&self, func: Arc<TableFunction>);
91
92 fn read_table(&self, table: TableRef) -> Result<DataFrame>;
94
95 fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext;
97
98 fn engine_state(&self) -> &QueryEngineState;
100}
101
102pub struct QueryEngineFactory {
103 query_engine: Arc<dyn QueryEngine>,
104}
105
106impl QueryEngineFactory {
107 pub fn new(
108 catalog_manager: CatalogManagerRef,
109 region_query_handler: Option<RegionQueryHandlerRef>,
110 table_mutation_handler: Option<TableMutationHandlerRef>,
111 procedure_service_handler: Option<ProcedureServiceHandlerRef>,
112 flow_service_handler: Option<FlowServiceHandlerRef>,
113 with_dist_planner: bool,
114 options: QueryOptions,
115 ) -> Self {
116 Self::new_with_plugins(
117 catalog_manager,
118 None,
119 region_query_handler,
120 table_mutation_handler,
121 procedure_service_handler,
122 flow_service_handler,
123 with_dist_planner,
124 Default::default(),
125 options,
126 )
127 }
128
129 #[allow(clippy::too_many_arguments)]
130 pub fn new_with_plugins(
131 catalog_manager: CatalogManagerRef,
132 partition_rule_manager: Option<PartitionRuleManagerRef>,
133 region_query_handler: Option<RegionQueryHandlerRef>,
134 table_mutation_handler: Option<TableMutationHandlerRef>,
135 procedure_service_handler: Option<ProcedureServiceHandlerRef>,
136 flow_service_handler: Option<FlowServiceHandlerRef>,
137 with_dist_planner: bool,
138 plugins: Plugins,
139 options: QueryOptions,
140 ) -> Self {
141 let state = Arc::new(QueryEngineState::new(
142 catalog_manager,
143 partition_rule_manager,
144 region_query_handler,
145 table_mutation_handler,
146 procedure_service_handler,
147 flow_service_handler,
148 with_dist_planner,
149 plugins.clone(),
150 options,
151 ));
152 let query_engine = Arc::new(DatafusionQueryEngine::new(state, plugins));
153 register_functions(&query_engine);
154 Self { query_engine }
155 }
156
157 pub fn query_engine(&self) -> QueryEngineRef {
158 self.query_engine.clone()
159 }
160}
161
162fn register_functions(query_engine: &Arc<DatafusionQueryEngine>) {
164 for func in FUNCTION_REGISTRY.scalar_functions() {
165 query_engine.register_scalar_function(func);
166 }
167
168 for accumulator in FUNCTION_REGISTRY.aggregate_functions() {
169 query_engine.register_aggregate_function(accumulator);
170 }
171
172 for table_function in FUNCTION_REGISTRY.table_functions() {
173 query_engine.register_table_function(table_function);
174 }
175}
176
177pub type QueryEngineRef = Arc<dyn QueryEngine>;
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182
183 #[test]
184 fn test_query_engine_factory() {
185 let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap();
186 let factory = QueryEngineFactory::new(
187 catalog_list,
188 None,
189 None,
190 None,
191 None,
192 false,
193 QueryOptions::default(),
194 );
195
196 let engine = factory.query_engine();
197
198 assert_eq!("datafusion", engine.name());
199 }
200}