1use std::collections::HashMap;
16use std::fmt;
17use std::sync::{Arc, RwLock};
18
19use async_trait::async_trait;
20use catalog::CatalogManagerRef;
21use common_base::Plugins;
22use common_function::function_factory::ScalarFunctionFactory;
23use common_function::handlers::{
24 FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
25};
26use common_function::state::FunctionState;
27use common_telemetry::warn;
28use datafusion::dataframe::DataFrame;
29use datafusion::error::Result as DfResult;
30use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState};
31use datafusion::execution::runtime_env::RuntimeEnv;
32use datafusion::execution::SessionStateBuilder;
33use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
34use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
35use datafusion::physical_optimizer::PhysicalOptimizerRule;
36use datafusion::physical_plan::ExecutionPlan;
37use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
38use datafusion_expr::{AggregateUDF, LogicalPlan as DfLogicalPlan};
39use datafusion_optimizer::analyzer::Analyzer;
40use datafusion_optimizer::optimizer::Optimizer;
41use partition::manager::PartitionRuleManagerRef;
42use promql::extension_plan::PromExtensionPlanner;
43use table::table::adapter::DfTableProviderAdapter;
44use table::TableRef;
45
46use crate::dist_plan::{
47 DistExtensionPlanner, DistPlannerAnalyzer, DistPlannerOptions, MergeSortExtensionPlanner,
48};
49use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
50use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
51use crate::optimizer::parallelize_scan::ParallelizeScan;
52use crate::optimizer::pass_distribution::PassDistribution;
53use crate::optimizer::remove_duplicate::RemoveDuplicate;
54use crate::optimizer::scan_hint::ScanHintRule;
55use crate::optimizer::string_normalization::StringNormalizationRule;
56use crate::optimizer::transcribe_atat::TranscribeAtatRule;
57use crate::optimizer::type_conversion::TypeConversionRule;
58use crate::optimizer::windowed_sort::WindowedSortPhysicalRule;
59use crate::optimizer::ExtensionAnalyzerRule;
60use crate::options::QueryOptions as QueryOptionsNew;
61use crate::query_engine::options::QueryOptions;
62use crate::query_engine::DefaultSerializer;
63use crate::range_select::planner::RangeSelectPlanner;
64use crate::region_query::RegionQueryHandlerRef;
65use crate::QueryEngineContext;
66
67#[derive(Clone)]
69pub struct QueryEngineState {
70 df_context: SessionContext,
71 catalog_manager: CatalogManagerRef,
72 function_state: Arc<FunctionState>,
73 scalar_functions: Arc<RwLock<HashMap<String, ScalarFunctionFactory>>>,
74 aggr_functions: Arc<RwLock<HashMap<String, AggregateUDF>>>,
75 extension_rules: Vec<Arc<dyn ExtensionAnalyzerRule + Send + Sync>>,
76 plugins: Plugins,
77}
78
79impl fmt::Debug for QueryEngineState {
80 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
81 f.debug_struct("QueryEngineState")
82 .field("state", &self.df_context.state())
83 .finish()
84 }
85}
86
87impl QueryEngineState {
88 #[allow(clippy::too_many_arguments)]
89 pub fn new(
90 catalog_list: CatalogManagerRef,
91 partition_rule_manager: Option<PartitionRuleManagerRef>,
92 region_query_handler: Option<RegionQueryHandlerRef>,
93 table_mutation_handler: Option<TableMutationHandlerRef>,
94 procedure_service_handler: Option<ProcedureServiceHandlerRef>,
95 flow_service_handler: Option<FlowServiceHandlerRef>,
96 with_dist_planner: bool,
97 plugins: Plugins,
98 options: QueryOptionsNew,
99 ) -> Self {
100 let runtime_env = Arc::new(RuntimeEnv::default());
101 let mut session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
102 if options.parallelism > 0 {
103 session_config = session_config.with_target_partitions(options.parallelism);
104 }
105 if options.allow_query_fallback {
106 session_config
107 .options_mut()
108 .extensions
109 .insert(DistPlannerOptions {
110 allow_query_fallback: true,
111 });
112 }
113
114 session_config
117 .options_mut()
118 .execution
119 .skip_physical_aggregate_schema_check = true;
120
121 let mut extension_rules = Vec::new();
123
124 extension_rules.insert(0, Arc::new(TypeConversionRule) as _);
126
127 let mut analyzer = Analyzer::new();
129 analyzer.rules.insert(0, Arc::new(TranscribeAtatRule));
130 analyzer.rules.insert(0, Arc::new(StringNormalizationRule));
131 analyzer
132 .rules
133 .insert(0, Arc::new(CountWildcardToTimeIndexRule));
134
135 if with_dist_planner {
136 analyzer.rules.push(Arc::new(DistPlannerAnalyzer));
137 }
138
139 let mut optimizer = Optimizer::new();
140 optimizer.rules.push(Arc::new(ScanHintRule));
141
142 let mut physical_optimizer = PhysicalOptimizer::new();
144 physical_optimizer
146 .rules
147 .insert(5, Arc::new(ParallelizeScan));
148 physical_optimizer
150 .rules
151 .insert(6, Arc::new(PassDistribution));
152 physical_optimizer.rules.insert(
154 7,
155 Arc::new(datafusion::physical_optimizer::enforce_sorting::EnforceSorting {}),
156 );
157 physical_optimizer
159 .rules
160 .push(Arc::new(WindowedSortPhysicalRule));
161 physical_optimizer
162 .rules
163 .push(Arc::new(MatchesConstantTermOptimizer));
164 physical_optimizer.rules.push(Arc::new(RemoveDuplicate));
166 Self::remove_physical_optimizer_rule(
168 &mut physical_optimizer.rules,
169 SanityCheckPlan {}.name(),
170 );
171 physical_optimizer.rules.push(Arc::new(SanityCheckPlan {}));
172
173 let session_state = SessionStateBuilder::new()
174 .with_config(session_config)
175 .with_runtime_env(runtime_env)
176 .with_default_features()
177 .with_analyzer_rules(analyzer.rules)
178 .with_serializer_registry(Arc::new(DefaultSerializer))
179 .with_query_planner(Arc::new(DfQueryPlanner::new(
180 catalog_list.clone(),
181 partition_rule_manager,
182 region_query_handler,
183 )))
184 .with_optimizer_rules(optimizer.rules)
185 .with_physical_optimizer_rules(physical_optimizer.rules)
186 .build();
187
188 let df_context = SessionContext::new_with_state(session_state);
189
190 Self {
191 df_context,
192 catalog_manager: catalog_list,
193 function_state: Arc::new(FunctionState {
194 table_mutation_handler,
195 procedure_service_handler,
196 flow_service_handler,
197 }),
198 aggr_functions: Arc::new(RwLock::new(HashMap::new())),
199 extension_rules,
200 plugins,
201 scalar_functions: Arc::new(RwLock::new(HashMap::new())),
202 }
203 }
204
205 fn remove_physical_optimizer_rule(
206 rules: &mut Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
207 name: &str,
208 ) {
209 rules.retain(|rule| rule.name() != name);
210 }
211
212 pub fn optimize_by_extension_rules(
214 &self,
215 plan: DfLogicalPlan,
216 context: &QueryEngineContext,
217 ) -> DfResult<DfLogicalPlan> {
218 self.extension_rules
219 .iter()
220 .try_fold(plan, |acc_plan, rule| {
221 rule.analyze(acc_plan, context, self.session_state().config_options())
222 })
223 }
224
225 pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult<DfLogicalPlan> {
227 self.session_state().optimize(&plan)
228 }
229
230 pub fn scalar_function(&self, function_name: &str) -> Option<ScalarFunctionFactory> {
232 self.scalar_functions
233 .read()
234 .unwrap()
235 .get(function_name)
236 .cloned()
237 }
238
239 pub fn scalar_names(&self) -> Vec<String> {
241 self.scalar_functions
242 .read()
243 .unwrap()
244 .keys()
245 .cloned()
246 .collect()
247 }
248
249 pub fn aggr_function(&self, function_name: &str) -> Option<AggregateUDF> {
251 self.aggr_functions
252 .read()
253 .unwrap()
254 .get(function_name)
255 .cloned()
256 }
257
258 pub fn aggr_names(&self) -> Vec<String> {
260 self.aggr_functions
261 .read()
262 .unwrap()
263 .keys()
264 .cloned()
265 .collect()
266 }
267
268 pub fn register_scalar_function(&self, func: ScalarFunctionFactory) {
271 let name = func.name().to_string();
272 let x = self
273 .scalar_functions
274 .write()
275 .unwrap()
276 .insert(name.clone(), func);
277
278 if x.is_some() {
279 warn!("Already registered scalar function '{name}'");
280 }
281 }
282
283 pub fn register_aggr_function(&self, func: AggregateUDF) {
292 let name = func.name().to_string();
293 let x = self
294 .aggr_functions
295 .write()
296 .unwrap()
297 .insert(name.clone(), func);
298 assert!(
299 x.is_none(),
300 "Already registered aggregate function '{name}'"
301 );
302 }
303
304 pub fn catalog_manager(&self) -> &CatalogManagerRef {
305 &self.catalog_manager
306 }
307
308 pub fn function_state(&self) -> Arc<FunctionState> {
309 self.function_state.clone()
310 }
311
312 pub fn table_mutation_handler(&self) -> Option<&TableMutationHandlerRef> {
314 self.function_state.table_mutation_handler.as_ref()
315 }
316
317 pub fn procedure_service_handler(&self) -> Option<&ProcedureServiceHandlerRef> {
319 self.function_state.procedure_service_handler.as_ref()
320 }
321
322 pub(crate) fn disallow_cross_catalog_query(&self) -> bool {
323 self.plugins
324 .map::<QueryOptions, _, _>(|x| x.disallow_cross_catalog_query)
325 .unwrap_or(false)
326 }
327
328 pub fn session_state(&self) -> SessionState {
329 self.df_context.state()
330 }
331
332 pub fn read_table(&self, table: TableRef) -> DfResult<DataFrame> {
334 self.df_context
335 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
336 }
337}
338
339struct DfQueryPlanner {
340 physical_planner: DefaultPhysicalPlanner,
341}
342
343impl fmt::Debug for DfQueryPlanner {
344 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
345 f.debug_struct("DfQueryPlanner").finish()
346 }
347}
348
349#[async_trait]
350impl QueryPlanner for DfQueryPlanner {
351 async fn create_physical_plan(
352 &self,
353 logical_plan: &DfLogicalPlan,
354 session_state: &SessionState,
355 ) -> DfResult<Arc<dyn ExecutionPlan>> {
356 self.physical_planner
357 .create_physical_plan(logical_plan, session_state)
358 .await
359 }
360}
361
362impl DfQueryPlanner {
363 fn new(
364 catalog_manager: CatalogManagerRef,
365 partition_rule_manager: Option<PartitionRuleManagerRef>,
366 region_query_handler: Option<RegionQueryHandlerRef>,
367 ) -> Self {
368 let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> =
369 vec![Arc::new(PromExtensionPlanner), Arc::new(RangeSelectPlanner)];
370 if let (Some(region_query_handler), Some(partition_rule_manager)) =
371 (region_query_handler, partition_rule_manager)
372 {
373 planners.push(Arc::new(DistExtensionPlanner::new(
374 catalog_manager,
375 partition_rule_manager,
376 region_query_handler,
377 )));
378 planners.push(Arc::new(MergeSortExtensionPlanner {}));
379 }
380 Self {
381 physical_planner: DefaultPhysicalPlanner::with_extension_planners(planners),
382 }
383 }
384}