1use std::collections::HashMap;
16use std::fmt;
17use std::sync::{Arc, RwLock};
18
19use async_trait::async_trait;
20use catalog::CatalogManagerRef;
21use common_base::Plugins;
22use common_function::aggrs::aggr_wrapper::fix_order::FixStateUdafOrderingAnalyzer;
23use common_function::function_factory::ScalarFunctionFactory;
24use common_function::handlers::{
25 FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
26};
27use common_function::state::FunctionState;
28use common_stat::get_total_memory_bytes;
29use common_telemetry::warn;
30use datafusion::catalog::TableFunction;
31use datafusion::dataframe::DataFrame;
32use datafusion::error::Result as DfResult;
33use datafusion::execution::SessionStateBuilder;
34use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState};
35use datafusion::execution::memory_pool::{
36 GreedyMemoryPool, MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
37};
38use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
39use datafusion::physical_optimizer::PhysicalOptimizerRule;
40use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
41use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
42use datafusion::physical_plan::ExecutionPlan;
43use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
44use datafusion_expr::{AggregateUDF, LogicalPlan as DfLogicalPlan};
45use datafusion_optimizer::analyzer::Analyzer;
46use datafusion_optimizer::optimizer::Optimizer;
47use partition::manager::PartitionRuleManagerRef;
48use promql::extension_plan::PromExtensionPlanner;
49use table::TableRef;
50use table::table::adapter::DfTableProviderAdapter;
51
52use crate::QueryEngineContext;
53use crate::dist_plan::{
54 DistExtensionPlanner, DistPlannerAnalyzer, DistPlannerOptions, MergeSortExtensionPlanner,
55};
56use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES};
57use crate::optimizer::ExtensionAnalyzerRule;
58use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
59use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
60use crate::optimizer::parallelize_scan::ParallelizeScan;
61use crate::optimizer::pass_distribution::PassDistribution;
62use crate::optimizer::remove_duplicate::RemoveDuplicate;
63use crate::optimizer::scan_hint::ScanHintRule;
64use crate::optimizer::string_normalization::StringNormalizationRule;
65use crate::optimizer::transcribe_atat::TranscribeAtatRule;
66use crate::optimizer::type_conversion::TypeConversionRule;
67use crate::optimizer::windowed_sort::WindowedSortPhysicalRule;
68use crate::options::QueryOptions as QueryOptionsNew;
69use crate::query_engine::DefaultSerializer;
70use crate::query_engine::options::QueryOptions;
71use crate::range_select::planner::RangeSelectPlanner;
72use crate::region_query::RegionQueryHandlerRef;
73
74#[derive(Clone)]
76pub struct QueryEngineState {
77 df_context: SessionContext,
78 catalog_manager: CatalogManagerRef,
79 function_state: Arc<FunctionState>,
80 scalar_functions: Arc<RwLock<HashMap<String, ScalarFunctionFactory>>>,
81 aggr_functions: Arc<RwLock<HashMap<String, AggregateUDF>>>,
82 table_functions: Arc<RwLock<HashMap<String, Arc<TableFunction>>>>,
83 extension_rules: Vec<Arc<dyn ExtensionAnalyzerRule + Send + Sync>>,
84 plugins: Plugins,
85}
86
87impl fmt::Debug for QueryEngineState {
88 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
89 f.debug_struct("QueryEngineState")
90 .field("state", &self.df_context.state())
91 .finish()
92 }
93}
94
95impl QueryEngineState {
96 #[allow(clippy::too_many_arguments)]
97 pub fn new(
98 catalog_list: CatalogManagerRef,
99 partition_rule_manager: Option<PartitionRuleManagerRef>,
100 region_query_handler: Option<RegionQueryHandlerRef>,
101 table_mutation_handler: Option<TableMutationHandlerRef>,
102 procedure_service_handler: Option<ProcedureServiceHandlerRef>,
103 flow_service_handler: Option<FlowServiceHandlerRef>,
104 with_dist_planner: bool,
105 plugins: Plugins,
106 options: QueryOptionsNew,
107 ) -> Self {
108 let total_memory = get_total_memory_bytes().max(0) as u64;
109 let memory_pool_size = options.memory_pool_size.resolve(total_memory) as usize;
110 let runtime_env = if memory_pool_size > 0 {
111 Arc::new(
112 RuntimeEnvBuilder::new()
113 .with_memory_pool(Arc::new(MetricsMemoryPool::new(memory_pool_size)))
114 .build()
115 .expect("Failed to build RuntimeEnv"),
116 )
117 } else {
118 Arc::new(RuntimeEnv::default())
119 };
120 let mut session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
121 if options.parallelism > 0 {
122 session_config = session_config.with_target_partitions(options.parallelism);
123 }
124 if options.allow_query_fallback {
125 session_config
126 .options_mut()
127 .extensions
128 .insert(DistPlannerOptions {
129 allow_query_fallback: true,
130 });
131 }
132
133 session_config
136 .options_mut()
137 .execution
138 .skip_physical_aggregate_schema_check = true;
139
140 let mut extension_rules = Vec::new();
142
143 extension_rules.insert(0, Arc::new(TypeConversionRule) as _);
145
146 let mut analyzer = Analyzer::new();
148 analyzer.rules.insert(0, Arc::new(TranscribeAtatRule));
149 analyzer.rules.insert(0, Arc::new(StringNormalizationRule));
150 analyzer
151 .rules
152 .insert(0, Arc::new(CountWildcardToTimeIndexRule));
153
154 if with_dist_planner {
155 analyzer.rules.push(Arc::new(DistPlannerAnalyzer));
156 }
157
158 analyzer.rules.push(Arc::new(FixStateUdafOrderingAnalyzer));
159
160 let mut optimizer = Optimizer::new();
161 optimizer.rules.push(Arc::new(ScanHintRule));
162
163 let mut physical_optimizer = PhysicalOptimizer::new();
165 physical_optimizer
167 .rules
168 .insert(5, Arc::new(ParallelizeScan));
169 physical_optimizer
171 .rules
172 .insert(6, Arc::new(PassDistribution));
173 physical_optimizer.rules.insert(
175 7,
176 Arc::new(datafusion::physical_optimizer::enforce_sorting::EnforceSorting {}),
177 );
178 physical_optimizer
180 .rules
181 .push(Arc::new(WindowedSortPhysicalRule));
182 physical_optimizer
183 .rules
184 .push(Arc::new(MatchesConstantTermOptimizer));
185 physical_optimizer.rules.push(Arc::new(RemoveDuplicate));
187 Self::remove_physical_optimizer_rule(
189 &mut physical_optimizer.rules,
190 SanityCheckPlan {}.name(),
191 );
192 physical_optimizer.rules.push(Arc::new(SanityCheckPlan {}));
193
194 let session_state = SessionStateBuilder::new()
195 .with_config(session_config)
196 .with_runtime_env(runtime_env)
197 .with_default_features()
198 .with_analyzer_rules(analyzer.rules)
199 .with_serializer_registry(Arc::new(DefaultSerializer))
200 .with_query_planner(Arc::new(DfQueryPlanner::new(
201 catalog_list.clone(),
202 partition_rule_manager,
203 region_query_handler,
204 )))
205 .with_optimizer_rules(optimizer.rules)
206 .with_physical_optimizer_rules(physical_optimizer.rules)
207 .build();
208
209 let df_context = SessionContext::new_with_state(session_state);
210
211 Self {
212 df_context,
213 catalog_manager: catalog_list,
214 function_state: Arc::new(FunctionState {
215 table_mutation_handler,
216 procedure_service_handler,
217 flow_service_handler,
218 }),
219 aggr_functions: Arc::new(RwLock::new(HashMap::new())),
220 table_functions: Arc::new(RwLock::new(HashMap::new())),
221 extension_rules,
222 plugins,
223 scalar_functions: Arc::new(RwLock::new(HashMap::new())),
224 }
225 }
226
227 fn remove_physical_optimizer_rule(
228 rules: &mut Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
229 name: &str,
230 ) {
231 rules.retain(|rule| rule.name() != name);
232 }
233
234 pub fn optimize_by_extension_rules(
236 &self,
237 plan: DfLogicalPlan,
238 context: &QueryEngineContext,
239 ) -> DfResult<DfLogicalPlan> {
240 self.extension_rules
241 .iter()
242 .try_fold(plan, |acc_plan, rule| {
243 rule.analyze(acc_plan, context, self.session_state().config_options())
244 })
245 }
246
247 pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult<DfLogicalPlan> {
249 self.session_state().optimize(&plan)
250 }
251
252 pub fn scalar_function(&self, function_name: &str) -> Option<ScalarFunctionFactory> {
254 self.scalar_functions
255 .read()
256 .unwrap()
257 .get(function_name)
258 .cloned()
259 }
260
261 pub fn scalar_names(&self) -> Vec<String> {
263 self.scalar_functions
264 .read()
265 .unwrap()
266 .keys()
267 .cloned()
268 .collect()
269 }
270
271 pub fn aggr_function(&self, function_name: &str) -> Option<AggregateUDF> {
273 self.aggr_functions
274 .read()
275 .unwrap()
276 .get(function_name)
277 .cloned()
278 }
279
280 pub fn aggr_names(&self) -> Vec<String> {
282 self.aggr_functions
283 .read()
284 .unwrap()
285 .keys()
286 .cloned()
287 .collect()
288 }
289
290 pub fn table_function(&self, function_name: &str) -> Option<Arc<TableFunction>> {
292 self.table_functions
293 .read()
294 .unwrap()
295 .get(function_name)
296 .cloned()
297 }
298
299 pub fn table_function_names(&self) -> Vec<String> {
301 self.table_functions
302 .read()
303 .unwrap()
304 .keys()
305 .cloned()
306 .collect()
307 }
308
309 pub fn register_scalar_function(&self, func: ScalarFunctionFactory) {
312 let name = func.name().to_string();
313 let x = self
314 .scalar_functions
315 .write()
316 .unwrap()
317 .insert(name.clone(), func);
318
319 if x.is_some() {
320 warn!("Already registered scalar function '{name}'");
321 }
322 }
323
324 pub fn register_aggr_function(&self, func: AggregateUDF) {
333 let name = func.name().to_string();
334 let x = self
335 .aggr_functions
336 .write()
337 .unwrap()
338 .insert(name.clone(), func);
339 assert!(
340 x.is_none(),
341 "Already registered aggregate function '{name}'"
342 );
343 }
344
345 pub fn register_table_function(&self, func: Arc<TableFunction>) {
346 let name = func.name();
347 let x = self
348 .table_functions
349 .write()
350 .unwrap()
351 .insert(name.to_string(), func.clone());
352
353 if x.is_some() {
354 warn!("Already registered table function '{name}");
355 }
356 }
357
358 pub fn catalog_manager(&self) -> &CatalogManagerRef {
359 &self.catalog_manager
360 }
361
362 pub fn function_state(&self) -> Arc<FunctionState> {
363 self.function_state.clone()
364 }
365
366 pub fn table_mutation_handler(&self) -> Option<&TableMutationHandlerRef> {
368 self.function_state.table_mutation_handler.as_ref()
369 }
370
371 pub fn procedure_service_handler(&self) -> Option<&ProcedureServiceHandlerRef> {
373 self.function_state.procedure_service_handler.as_ref()
374 }
375
376 pub(crate) fn disallow_cross_catalog_query(&self) -> bool {
377 self.plugins
378 .map::<QueryOptions, _, _>(|x| x.disallow_cross_catalog_query)
379 .unwrap_or(false)
380 }
381
382 pub fn session_state(&self) -> SessionState {
383 self.df_context.state()
384 }
385
386 pub fn read_table(&self, table: TableRef) -> DfResult<DataFrame> {
388 self.df_context
389 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
390 }
391}
392
393struct DfQueryPlanner {
394 physical_planner: DefaultPhysicalPlanner,
395}
396
397impl fmt::Debug for DfQueryPlanner {
398 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
399 f.debug_struct("DfQueryPlanner").finish()
400 }
401}
402
403#[async_trait]
404impl QueryPlanner for DfQueryPlanner {
405 async fn create_physical_plan(
406 &self,
407 logical_plan: &DfLogicalPlan,
408 session_state: &SessionState,
409 ) -> DfResult<Arc<dyn ExecutionPlan>> {
410 self.physical_planner
411 .create_physical_plan(logical_plan, session_state)
412 .await
413 }
414}
415
416impl DfQueryPlanner {
417 fn new(
418 catalog_manager: CatalogManagerRef,
419 partition_rule_manager: Option<PartitionRuleManagerRef>,
420 region_query_handler: Option<RegionQueryHandlerRef>,
421 ) -> Self {
422 let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> =
423 vec![Arc::new(PromExtensionPlanner), Arc::new(RangeSelectPlanner)];
424 if let (Some(region_query_handler), Some(partition_rule_manager)) =
425 (region_query_handler, partition_rule_manager)
426 {
427 planners.push(Arc::new(DistExtensionPlanner::new(
428 catalog_manager,
429 partition_rule_manager,
430 region_query_handler,
431 )));
432 planners.push(Arc::new(MergeSortExtensionPlanner {}));
433 }
434 Self {
435 physical_planner: DefaultPhysicalPlanner::with_extension_planners(planners),
436 }
437 }
438}
439
440#[derive(Debug)]
445struct MetricsMemoryPool {
446 inner: Arc<GreedyMemoryPool>,
447}
448
449impl MetricsMemoryPool {
450 fn new(limit: usize) -> Self {
451 Self {
452 inner: Arc::new(GreedyMemoryPool::new(limit)),
453 }
454 }
455
456 #[inline]
457 fn update_metrics(&self) {
458 QUERY_MEMORY_POOL_USAGE_BYTES.set(self.inner.reserved() as i64);
459 }
460}
461
462impl MemoryPool for MetricsMemoryPool {
463 fn register(&self, consumer: &MemoryConsumer) {
464 self.inner.register(consumer);
465 }
466
467 fn unregister(&self, consumer: &MemoryConsumer) {
468 self.inner.unregister(consumer);
469 }
470
471 fn grow(&self, reservation: &MemoryReservation, additional: usize) {
472 self.inner.grow(reservation, additional);
473 self.update_metrics();
474 }
475
476 fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
477 self.inner.shrink(reservation, shrink);
478 self.update_metrics();
479 }
480
481 fn try_grow(
482 &self,
483 reservation: &MemoryReservation,
484 additional: usize,
485 ) -> datafusion_common::Result<()> {
486 let result = self.inner.try_grow(reservation, additional);
487 if result.is_err() {
488 QUERY_MEMORY_POOL_REJECTED_TOTAL.inc();
489 }
490 self.update_metrics();
491 result
492 }
493
494 fn reserved(&self) -> usize {
495 self.inner.reserved()
496 }
497
498 fn memory_limit(&self) -> MemoryLimit {
499 self.inner.memory_limit()
500 }
501}