1use std::collections::HashMap;
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::sync::{Arc, RwLock};
19
20use async_trait::async_trait;
21use catalog::CatalogManagerRef;
22use common_base::Plugins;
23use common_function::aggrs::aggr_wrapper::fix_order::FixStateUdafOrderingAnalyzer;
24use common_function::function_factory::ScalarFunctionFactory;
25use common_function::function_registry::FUNCTION_REGISTRY;
26use common_function::handlers::{
27 FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
28};
29use common_function::state::FunctionState;
30use common_stat::get_total_memory_bytes;
31use common_telemetry::warn;
32use datafusion::catalog::TableFunction;
33use datafusion::dataframe::DataFrame;
34use datafusion::error::Result as DfResult;
35use datafusion::execution::SessionStateBuilder;
36use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState};
37use datafusion::execution::memory_pool::{
38 GreedyMemoryPool, MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
39 TrackConsumersPool,
40};
41use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
42use datafusion::physical_optimizer::PhysicalOptimizerRule;
43use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
44use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
45use datafusion::physical_plan::ExecutionPlan;
46use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
47use datafusion_expr::{AggregateUDF, LogicalPlan as DfLogicalPlan, WindowUDF};
48use datafusion_optimizer::Analyzer;
49use datafusion_optimizer::analyzer::function_rewrite::ApplyFunctionRewrites;
50use datafusion_optimizer::optimizer::Optimizer;
51use partition::manager::PartitionRuleManagerRef;
52use promql::extension_plan::PromExtensionPlanner;
53use session::context::QueryContextRef;
54use table::TableRef;
55use table::table::adapter::DfTableProviderAdapter;
56
57use crate::QueryEngineContext;
58use crate::dist_plan::{
59 DistExtensionPlanner, DistPlannerAnalyzer, DistPlannerOptions, DynFilterRegistryManager,
60 MergeSortExtensionPlanner, RemoteDynFilterReceiverExtensionPlanner,
61 RemoteDynFilterRegistryLease,
62};
63use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES};
64use crate::optimizer::ExtensionAnalyzerRule;
65use crate::optimizer::const_normalization::ConstNormalizationRule;
66use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
67use crate::optimizer::count_nest_aggr::CountNestAggrRule;
68use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
69use crate::optimizer::json_type_concretize::JsonTypeConcretizeRule;
70use crate::optimizer::parallelize_scan::ParallelizeScan;
71use crate::optimizer::pass_distribution::PassDistribution;
72use crate::optimizer::promql_tsid_narrow_join::PromqlTsidNarrowJoin;
73use crate::optimizer::remove_duplicate::RemoveDuplicate;
74use crate::optimizer::scan_hint::ScanHintRule;
75use crate::optimizer::string_normalization::StringNormalizationRule;
76use crate::optimizer::transcribe_atat::TranscribeAtatRule;
77use crate::optimizer::type_conversion::TypeConversionRule;
78use crate::optimizer::windowed_sort::WindowedSortPhysicalRule;
79use crate::options::QueryOptions as QueryOptionsNew;
80use crate::query_engine::DefaultSerializer;
81use crate::query_engine::options::QueryOptions;
82use crate::range_select::planner::RangeSelectPlanner;
83use crate::region_query::RegionQueryHandlerRef;
84
85#[derive(Clone)]
87pub struct QueryEngineState {
88 df_context: SessionContext,
89 catalog_manager: CatalogManagerRef,
90 dyn_filter_registry_manager: Arc<DynFilterRegistryManager>,
91 function_state: Arc<FunctionState>,
92 scalar_functions: Arc<RwLock<HashMap<String, ScalarFunctionFactory>>>,
93 aggr_functions: Arc<RwLock<HashMap<String, AggregateUDF>>>,
94 table_functions: Arc<RwLock<HashMap<String, Arc<TableFunction>>>>,
95 extension_rules: Vec<Arc<dyn ExtensionAnalyzerRule + Send + Sync>>,
96 plugins: Plugins,
97}
98
99impl fmt::Debug for QueryEngineState {
100 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
101 f.debug_struct("QueryEngineState")
102 .field("state", &self.df_context.state())
103 .finish()
104 }
105}
106
107impl QueryEngineState {
108 #[allow(clippy::too_many_arguments)]
109 pub fn new(
110 catalog_list: CatalogManagerRef,
111 partition_rule_manager: Option<PartitionRuleManagerRef>,
112 region_query_handler: Option<RegionQueryHandlerRef>,
113 table_mutation_handler: Option<TableMutationHandlerRef>,
114 procedure_service_handler: Option<ProcedureServiceHandlerRef>,
115 flow_service_handler: Option<FlowServiceHandlerRef>,
116 with_dist_planner: bool,
117 plugins: Plugins,
118 options: QueryOptionsNew,
119 ) -> Self {
120 let total_memory = get_total_memory_bytes().max(0) as u64;
121 let memory_pool_size = options.memory_pool_size.resolve(total_memory) as usize;
122 let runtime_env = if memory_pool_size > 0 {
123 Arc::new(
124 RuntimeEnvBuilder::new()
125 .with_memory_pool(Arc::new(MetricsMemoryPool::new(memory_pool_size)))
126 .build()
127 .expect("Failed to build RuntimeEnv"),
128 )
129 } else {
130 Arc::new(RuntimeEnv::default())
131 };
132 let mut session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
133 if options.parallelism > 0 {
134 session_config = session_config.with_target_partitions(options.parallelism);
135 }
136 if options.allow_query_fallback {
137 session_config
138 .options_mut()
139 .extensions
140 .insert(DistPlannerOptions {
141 allow_query_fallback: true,
142 });
143 }
144
145 session_config
148 .options_mut()
149 .execution
150 .skip_physical_aggregate_schema_check = true;
151
152 let mut extension_rules = Vec::new();
154
155 extension_rules.insert(0, Arc::new(TypeConversionRule) as _);
157 extension_rules.push(Arc::new(CountNestAggrRule) as _);
158
159 let mut analyzer = Analyzer::new();
161 analyzer.rules.insert(0, Arc::new(TranscribeAtatRule));
162 analyzer.rules.insert(0, Arc::new(StringNormalizationRule));
163 analyzer
164 .rules
165 .insert(0, Arc::new(CountWildcardToTimeIndexRule));
166 analyzer.rules.push(Arc::new(ConstNormalizationRule));
167
168 analyzer.rules.insert(
172 0,
173 Arc::new(ApplyFunctionRewrites::new(
174 FUNCTION_REGISTRY.function_rewrites(),
175 )),
176 );
177
178 if with_dist_planner {
179 analyzer.rules.push(Arc::new(DistPlannerAnalyzer));
180 }
181 analyzer.rules.push(Arc::new(FixStateUdafOrderingAnalyzer));
182
183 let mut optimizer = Optimizer::new();
184 optimizer.rules.push(Arc::new(ScanHintRule));
185 optimizer.rules.push(Arc::new(JsonTypeConcretizeRule));
186
187 let mut physical_optimizer = PhysicalOptimizer::new();
189 physical_optimizer
191 .rules
192 .insert(5, Arc::new(ParallelizeScan));
193 physical_optimizer
195 .rules
196 .insert(6, Arc::new(PassDistribution));
197 physical_optimizer
199 .rules
200 .insert(7, Arc::new(PromqlTsidNarrowJoin));
201 physical_optimizer.rules.insert(
203 8,
204 Arc::new(datafusion::physical_optimizer::enforce_sorting::EnforceSorting {}),
205 );
206 physical_optimizer
208 .rules
209 .push(Arc::new(WindowedSortPhysicalRule));
210 physical_optimizer
215 .rules
216 .push(Arc::new(MatchesConstantTermOptimizer));
217 physical_optimizer.rules.push(Arc::new(RemoveDuplicate));
219 Self::remove_physical_optimizer_rule(
221 &mut physical_optimizer.rules,
222 SanityCheckPlan {}.name(),
223 );
224 physical_optimizer.rules.push(Arc::new(SanityCheckPlan {}));
225
226 let session_state = SessionStateBuilder::new()
227 .with_config(session_config)
228 .with_runtime_env(runtime_env)
229 .with_default_features()
230 .with_analyzer_rules(analyzer.rules)
231 .with_serializer_registry(Arc::new(DefaultSerializer))
232 .with_query_planner(Arc::new(DfQueryPlanner::new(
233 catalog_list.clone(),
234 partition_rule_manager,
235 region_query_handler.clone(),
236 )))
237 .with_optimizer_rules(optimizer.rules)
238 .with_physical_optimizer_rules(physical_optimizer.rules)
239 .build();
240
241 let df_context = SessionContext::new_with_state(session_state);
242 register_function_aliases(&df_context);
243
244 Self {
245 df_context,
246 catalog_manager: catalog_list,
247 dyn_filter_registry_manager: Arc::new(DynFilterRegistryManager::default()),
248 function_state: Arc::new(FunctionState {
249 table_mutation_handler,
250 procedure_service_handler,
251 flow_service_handler,
252 }),
253 aggr_functions: Arc::new(RwLock::new(HashMap::new())),
254 table_functions: Arc::new(RwLock::new(HashMap::new())),
255 extension_rules,
256 plugins,
257 scalar_functions: Arc::new(RwLock::new(HashMap::new())),
258 }
259 }
260
261 fn remove_physical_optimizer_rule(
262 rules: &mut Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
263 name: &str,
264 ) {
265 rules.retain(|rule| rule.name() != name);
266 }
267
268 pub fn optimize_by_extension_rules(
270 &self,
271 plan: DfLogicalPlan,
272 context: &QueryEngineContext,
273 ) -> DfResult<DfLogicalPlan> {
274 self.extension_rules
275 .iter()
276 .try_fold(plan, |acc_plan, rule| {
277 rule.analyze(acc_plan, context, self.session_state().config_options())
278 })
279 }
280
281 pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult<DfLogicalPlan> {
283 self.session_state().optimize(&plan)
284 }
285
286 pub fn scalar_function(&self, function_name: &str) -> Option<ScalarFunctionFactory> {
288 self.scalar_functions
289 .read()
290 .unwrap()
291 .get(function_name)
292 .cloned()
293 }
294
295 pub fn scalar_names(&self) -> Vec<String> {
297 self.scalar_functions
298 .read()
299 .unwrap()
300 .keys()
301 .cloned()
302 .collect()
303 }
304
305 pub fn aggr_function(&self, function_name: &str) -> Option<AggregateUDF> {
307 self.aggr_functions
308 .read()
309 .unwrap()
310 .get(function_name)
311 .cloned()
312 }
313
314 pub fn aggr_names(&self) -> Vec<String> {
316 self.aggr_functions
317 .read()
318 .unwrap()
319 .keys()
320 .cloned()
321 .collect()
322 }
323
324 pub fn table_function(&self, function_name: &str) -> Option<Arc<TableFunction>> {
326 self.table_functions
327 .read()
328 .unwrap()
329 .get(function_name)
330 .cloned()
331 }
332
333 pub fn table_function_names(&self) -> Vec<String> {
335 self.table_functions
336 .read()
337 .unwrap()
338 .keys()
339 .cloned()
340 .collect()
341 }
342
343 pub fn register_scalar_function(&self, func: ScalarFunctionFactory) {
346 let name = func.name().to_string();
347 let x = self
348 .scalar_functions
349 .write()
350 .unwrap()
351 .insert(name.clone(), func);
352
353 if x.is_some() {
354 warn!("Already registered scalar function '{name}'");
355 }
356 }
357
358 pub fn register_aggr_function(&self, func: AggregateUDF) {
367 let name = func.name().to_string();
368 let x = self
369 .aggr_functions
370 .write()
371 .unwrap()
372 .insert(name.clone(), func);
373 assert!(
374 x.is_none(),
375 "Already registered aggregate function '{name}'"
376 );
377 }
378
379 pub fn register_table_function(&self, func: Arc<TableFunction>) {
380 let name = func.name();
381 let x = self
382 .table_functions
383 .write()
384 .unwrap()
385 .insert(name.to_string(), func.clone());
386
387 if x.is_some() {
388 warn!("Already registered table function '{name}'");
389 }
390 }
391
392 pub fn register_window_function(&self, func: WindowUDF) {
397 self.df_context.register_udwf(func);
398 }
399
400 pub fn catalog_manager(&self) -> &CatalogManagerRef {
401 &self.catalog_manager
402 }
403
404 pub fn dyn_filter_registry_manager(&self) -> Arc<DynFilterRegistryManager> {
405 self.dyn_filter_registry_manager.clone()
406 }
407
408 pub fn acquire_remote_dyn_filter_registry_lease(
409 &self,
410 query_ctx: &QueryContextRef,
411 ) -> Option<RemoteDynFilterRegistryLease> {
412 let query_id = query_ctx.remote_query_id_value()?;
413 Some(
414 self.dyn_filter_registry_manager
415 .clone()
416 .acquire_lease(query_id),
417 )
418 }
419
420 pub fn function_state(&self) -> Arc<FunctionState> {
421 self.function_state.clone()
422 }
423
424 pub fn table_mutation_handler(&self) -> Option<&TableMutationHandlerRef> {
426 self.function_state.table_mutation_handler.as_ref()
427 }
428
429 pub fn procedure_service_handler(&self) -> Option<&ProcedureServiceHandlerRef> {
431 self.function_state.procedure_service_handler.as_ref()
432 }
433
434 pub(crate) fn disallow_cross_catalog_query(&self) -> bool {
435 self.plugins
436 .map::<QueryOptions, _, _>(|x| x.disallow_cross_catalog_query)
437 .unwrap_or(false)
438 }
439
440 pub fn session_state(&self) -> SessionState {
441 self.df_context.state()
442 }
443
444 pub fn read_table(&self, table: TableRef) -> DfResult<DataFrame> {
446 self.df_context
447 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
448 }
449}
450
451struct DfQueryPlanner {
452 physical_planner: DefaultPhysicalPlanner,
453}
454
455impl fmt::Debug for DfQueryPlanner {
456 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
457 f.debug_struct("DfQueryPlanner").finish()
458 }
459}
460
461#[async_trait]
462impl QueryPlanner for DfQueryPlanner {
463 async fn create_physical_plan(
464 &self,
465 logical_plan: &DfLogicalPlan,
466 session_state: &SessionState,
467 ) -> DfResult<Arc<dyn ExecutionPlan>> {
468 self.physical_planner
469 .create_physical_plan(logical_plan, session_state)
470 .await
471 }
472}
473
474const SCALAR_FUNCTION_ALIASES: &[(&str, &str)] = &[
476 ("upper", "ucase"),
477 ("lower", "lcase"),
478 ("ceil", "ceiling"),
479 ("substr", "mid"),
480 ("random", "rand"),
481];
482
483const AGGREGATE_FUNCTION_ALIASES: &[(&str, &str)] =
485 &[("stddev_pop", "std"), ("var_pop", "variance")];
486
487fn register_function_aliases(ctx: &SessionContext) {
492 let state = ctx.state();
493
494 for (target, alias) in SCALAR_FUNCTION_ALIASES {
495 if let Some(func) = state.scalar_functions().get(*target) {
496 let aliased = func.as_ref().clone().with_aliases([*alias]);
497 ctx.register_udf(aliased);
498 }
499 }
500
501 for (target, alias) in AGGREGATE_FUNCTION_ALIASES {
502 if let Some(func) = state.aggregate_functions().get(*target) {
503 let aliased = func.as_ref().clone().with_aliases([*alias]);
504 ctx.register_udaf(aliased);
505 }
506 }
507}
508
509impl DfQueryPlanner {
510 fn new(
511 catalog_manager: CatalogManagerRef,
512 partition_rule_manager: Option<PartitionRuleManagerRef>,
513 region_query_handler: Option<RegionQueryHandlerRef>,
514 ) -> Self {
515 let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> = vec![
516 Arc::new(PromExtensionPlanner),
517 Arc::new(RangeSelectPlanner),
518 Arc::new(RemoteDynFilterReceiverExtensionPlanner),
519 ];
520 if let (Some(region_query_handler), Some(partition_rule_manager)) =
521 (region_query_handler, partition_rule_manager)
522 {
523 planners.push(Arc::new(DistExtensionPlanner::new(
524 catalog_manager,
525 partition_rule_manager,
526 region_query_handler,
527 )));
528 planners.push(Arc::new(MergeSortExtensionPlanner {}));
529 }
530 Self {
531 physical_planner: DefaultPhysicalPlanner::with_extension_planners(planners),
532 }
533 }
534}
535
536#[derive(Debug)]
541struct MetricsMemoryPool {
542 inner: Arc<TrackConsumersPool<GreedyMemoryPool>>,
543}
544
545impl MetricsMemoryPool {
546 const TOP_CONSUMERS_TO_REPORT: usize = 5;
548
549 fn new(limit: usize) -> Self {
550 Self {
551 inner: Arc::new(TrackConsumersPool::new(
552 GreedyMemoryPool::new(limit),
553 NonZeroUsize::new(Self::TOP_CONSUMERS_TO_REPORT).unwrap(),
554 )),
555 }
556 }
557
558 #[inline]
559 fn update_metrics(&self) {
560 QUERY_MEMORY_POOL_USAGE_BYTES.set(self.inner.reserved() as i64);
561 }
562}
563
564impl MemoryPool for MetricsMemoryPool {
565 fn register(&self, consumer: &MemoryConsumer) {
566 self.inner.register(consumer);
567 }
568
569 fn unregister(&self, consumer: &MemoryConsumer) {
570 self.inner.unregister(consumer);
571 }
572
573 fn grow(&self, reservation: &MemoryReservation, additional: usize) {
574 self.inner.grow(reservation, additional);
575 self.update_metrics();
576 }
577
578 fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
579 self.inner.shrink(reservation, shrink);
580 self.update_metrics();
581 }
582
583 fn try_grow(
584 &self,
585 reservation: &MemoryReservation,
586 additional: usize,
587 ) -> datafusion_common::Result<()> {
588 let result = self.inner.try_grow(reservation, additional);
589 if result.is_err() {
590 QUERY_MEMORY_POOL_REJECTED_TOTAL.inc();
591 }
592 self.update_metrics();
593 result
594 }
595
596 fn reserved(&self) -> usize {
597 self.inner.reserved()
598 }
599
600 fn memory_limit(&self) -> MemoryLimit {
601 self.inner.memory_limit()
602 }
603}
604
605#[cfg(test)]
606mod tests {
607 use common_base::Plugins;
608 use session::context::QueryContext;
609
610 use super::*;
611 use crate::options::QueryOptions;
612
613 fn new_query_engine_state() -> QueryEngineState {
614 QueryEngineState::new(
615 catalog::memory::new_memory_catalog_manager().unwrap(),
616 None,
617 None,
618 None,
619 None,
620 None,
621 false,
622 Plugins::default(),
623 QueryOptions::default(),
624 )
625 }
626
627 #[test]
628 fn query_engine_state_reuses_query_scoped_dyn_filter_registry_lease() {
629 let state = new_query_engine_state();
630 let query_ctx = QueryContext::arc();
631
632 let first = state
633 .acquire_remote_dyn_filter_registry_lease(&query_ctx)
634 .unwrap();
635 let second = state
636 .acquire_remote_dyn_filter_registry_lease(&query_ctx)
637 .unwrap();
638
639 assert!(first.ptr_eq(&second));
640 assert_eq!(state.dyn_filter_registry_manager().registry_count(), 1);
641 assert_eq!(
642 first.registry().query_id(),
643 query_ctx.remote_query_id_value().unwrap()
644 );
645 }
646
647 #[test]
648 fn query_engine_state_relies_on_query_context_remote_query_id_contract() {
649 let state = new_query_engine_state();
650 let query_ctx = QueryContext::arc();
651
652 assert!(query_ctx.remote_query_id_value().is_some());
653
654 let lease = state
655 .acquire_remote_dyn_filter_registry_lease(&query_ctx)
656 .unwrap();
657
658 assert_eq!(
659 lease.registry().query_id(),
660 query_ctx.remote_query_id_value().unwrap()
661 );
662 assert_eq!(state.dyn_filter_registry_manager().registry_count(), 1);
663 }
664
665 #[test]
666 fn query_engine_state_separates_registries_for_different_query_contexts() {
667 let state = new_query_engine_state();
668 let first_query_ctx = QueryContext::arc();
669 let second_query_ctx = QueryContext::arc();
670
671 let first = state
672 .acquire_remote_dyn_filter_registry_lease(&first_query_ctx)
673 .unwrap();
674 let second = state
675 .acquire_remote_dyn_filter_registry_lease(&second_query_ctx)
676 .unwrap();
677
678 assert!(!first.ptr_eq(&second));
679 assert_eq!(state.dyn_filter_registry_manager().registry_count(), 2);
680 assert_eq!(
681 first.registry().query_id(),
682 first_query_ctx.remote_query_id_value().unwrap()
683 );
684 assert_eq!(
685 second.registry().query_id(),
686 second_query_ctx.remote_query_id_value().unwrap()
687 );
688 }
689}