1use std::collections::HashMap;
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::sync::{Arc, RwLock};
19
20use async_trait::async_trait;
21use catalog::CatalogManagerRef;
22use common_base::Plugins;
23use common_function::aggrs::aggr_wrapper::fix_order::FixStateUdafOrderingAnalyzer;
24use common_function::function_factory::ScalarFunctionFactory;
25use common_function::handlers::{
26 FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
27};
28use common_function::state::FunctionState;
29use common_stat::get_total_memory_bytes;
30use common_telemetry::warn;
31use datafusion::catalog::TableFunction;
32use datafusion::dataframe::DataFrame;
33use datafusion::error::Result as DfResult;
34use datafusion::execution::SessionStateBuilder;
35use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState};
36use datafusion::execution::memory_pool::{
37 GreedyMemoryPool, MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
38 TrackConsumersPool,
39};
40use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
41use datafusion::physical_optimizer::PhysicalOptimizerRule;
42use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
43use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
44use datafusion::physical_plan::ExecutionPlan;
45use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
46use datafusion_expr::{AggregateUDF, LogicalPlan as DfLogicalPlan};
47use datafusion_optimizer::analyzer::Analyzer;
48use datafusion_optimizer::optimizer::Optimizer;
49use partition::manager::PartitionRuleManagerRef;
50use promql::extension_plan::PromExtensionPlanner;
51use table::TableRef;
52use table::table::adapter::DfTableProviderAdapter;
53
54use crate::QueryEngineContext;
55use crate::dist_plan::{
56 DistExtensionPlanner, DistPlannerAnalyzer, DistPlannerOptions, MergeSortExtensionPlanner,
57};
58use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES};
59use crate::optimizer::ExtensionAnalyzerRule;
60use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
61use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
62use crate::optimizer::parallelize_scan::ParallelizeScan;
63use crate::optimizer::pass_distribution::PassDistribution;
64use crate::optimizer::remove_duplicate::RemoveDuplicate;
65use crate::optimizer::scan_hint::ScanHintRule;
66use crate::optimizer::string_normalization::StringNormalizationRule;
67use crate::optimizer::transcribe_atat::TranscribeAtatRule;
68use crate::optimizer::type_conversion::TypeConversionRule;
69use crate::optimizer::windowed_sort::WindowedSortPhysicalRule;
70use crate::options::QueryOptions as QueryOptionsNew;
71use crate::query_engine::DefaultSerializer;
72use crate::query_engine::options::QueryOptions;
73use crate::range_select::planner::RangeSelectPlanner;
74use crate::region_query::RegionQueryHandlerRef;
75
76#[derive(Clone)]
78pub struct QueryEngineState {
79 df_context: SessionContext,
80 catalog_manager: CatalogManagerRef,
81 function_state: Arc<FunctionState>,
82 scalar_functions: Arc<RwLock<HashMap<String, ScalarFunctionFactory>>>,
83 aggr_functions: Arc<RwLock<HashMap<String, AggregateUDF>>>,
84 table_functions: Arc<RwLock<HashMap<String, Arc<TableFunction>>>>,
85 extension_rules: Vec<Arc<dyn ExtensionAnalyzerRule + Send + Sync>>,
86 plugins: Plugins,
87}
88
89impl fmt::Debug for QueryEngineState {
90 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
91 f.debug_struct("QueryEngineState")
92 .field("state", &self.df_context.state())
93 .finish()
94 }
95}
96
97impl QueryEngineState {
98 #[allow(clippy::too_many_arguments)]
99 pub fn new(
100 catalog_list: CatalogManagerRef,
101 partition_rule_manager: Option<PartitionRuleManagerRef>,
102 region_query_handler: Option<RegionQueryHandlerRef>,
103 table_mutation_handler: Option<TableMutationHandlerRef>,
104 procedure_service_handler: Option<ProcedureServiceHandlerRef>,
105 flow_service_handler: Option<FlowServiceHandlerRef>,
106 with_dist_planner: bool,
107 plugins: Plugins,
108 options: QueryOptionsNew,
109 ) -> Self {
110 let total_memory = get_total_memory_bytes().max(0) as u64;
111 let memory_pool_size = options.memory_pool_size.resolve(total_memory) as usize;
112 let runtime_env = if memory_pool_size > 0 {
113 Arc::new(
114 RuntimeEnvBuilder::new()
115 .with_memory_pool(Arc::new(MetricsMemoryPool::new(memory_pool_size)))
116 .build()
117 .expect("Failed to build RuntimeEnv"),
118 )
119 } else {
120 Arc::new(RuntimeEnv::default())
121 };
122 let mut session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
123 if options.parallelism > 0 {
124 session_config = session_config.with_target_partitions(options.parallelism);
125 }
126 if options.allow_query_fallback {
127 session_config
128 .options_mut()
129 .extensions
130 .insert(DistPlannerOptions {
131 allow_query_fallback: true,
132 });
133 }
134
135 session_config
138 .options_mut()
139 .execution
140 .skip_physical_aggregate_schema_check = true;
141
142 let mut extension_rules = Vec::new();
144
145 extension_rules.insert(0, Arc::new(TypeConversionRule) as _);
147
148 let mut analyzer = Analyzer::new();
150 analyzer.rules.insert(0, Arc::new(TranscribeAtatRule));
151 analyzer.rules.insert(0, Arc::new(StringNormalizationRule));
152 analyzer
153 .rules
154 .insert(0, Arc::new(CountWildcardToTimeIndexRule));
155
156 if with_dist_planner {
157 analyzer.rules.push(Arc::new(DistPlannerAnalyzer));
158 }
159
160 analyzer.rules.push(Arc::new(FixStateUdafOrderingAnalyzer));
161
162 let mut optimizer = Optimizer::new();
163 optimizer.rules.push(Arc::new(ScanHintRule));
164
165 let mut physical_optimizer = PhysicalOptimizer::new();
167 physical_optimizer
169 .rules
170 .insert(5, Arc::new(ParallelizeScan));
171 physical_optimizer
173 .rules
174 .insert(6, Arc::new(PassDistribution));
175 physical_optimizer.rules.insert(
177 7,
178 Arc::new(datafusion::physical_optimizer::enforce_sorting::EnforceSorting {}),
179 );
180 physical_optimizer
182 .rules
183 .push(Arc::new(WindowedSortPhysicalRule));
184 physical_optimizer
185 .rules
186 .push(Arc::new(MatchesConstantTermOptimizer));
187 physical_optimizer.rules.push(Arc::new(RemoveDuplicate));
189 Self::remove_physical_optimizer_rule(
191 &mut physical_optimizer.rules,
192 SanityCheckPlan {}.name(),
193 );
194 physical_optimizer.rules.push(Arc::new(SanityCheckPlan {}));
195
196 let session_state = SessionStateBuilder::new()
197 .with_config(session_config)
198 .with_runtime_env(runtime_env)
199 .with_default_features()
200 .with_analyzer_rules(analyzer.rules)
201 .with_serializer_registry(Arc::new(DefaultSerializer))
202 .with_query_planner(Arc::new(DfQueryPlanner::new(
203 catalog_list.clone(),
204 partition_rule_manager,
205 region_query_handler,
206 )))
207 .with_optimizer_rules(optimizer.rules)
208 .with_physical_optimizer_rules(physical_optimizer.rules)
209 .build();
210
211 let df_context = SessionContext::new_with_state(session_state);
212
213 Self {
214 df_context,
215 catalog_manager: catalog_list,
216 function_state: Arc::new(FunctionState {
217 table_mutation_handler,
218 procedure_service_handler,
219 flow_service_handler,
220 }),
221 aggr_functions: Arc::new(RwLock::new(HashMap::new())),
222 table_functions: Arc::new(RwLock::new(HashMap::new())),
223 extension_rules,
224 plugins,
225 scalar_functions: Arc::new(RwLock::new(HashMap::new())),
226 }
227 }
228
229 fn remove_physical_optimizer_rule(
230 rules: &mut Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
231 name: &str,
232 ) {
233 rules.retain(|rule| rule.name() != name);
234 }
235
236 pub fn optimize_by_extension_rules(
238 &self,
239 plan: DfLogicalPlan,
240 context: &QueryEngineContext,
241 ) -> DfResult<DfLogicalPlan> {
242 self.extension_rules
243 .iter()
244 .try_fold(plan, |acc_plan, rule| {
245 rule.analyze(acc_plan, context, self.session_state().config_options())
246 })
247 }
248
249 pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult<DfLogicalPlan> {
251 self.session_state().optimize(&plan)
252 }
253
254 pub fn scalar_function(&self, function_name: &str) -> Option<ScalarFunctionFactory> {
256 self.scalar_functions
257 .read()
258 .unwrap()
259 .get(function_name)
260 .cloned()
261 }
262
263 pub fn scalar_names(&self) -> Vec<String> {
265 self.scalar_functions
266 .read()
267 .unwrap()
268 .keys()
269 .cloned()
270 .collect()
271 }
272
273 pub fn aggr_function(&self, function_name: &str) -> Option<AggregateUDF> {
275 self.aggr_functions
276 .read()
277 .unwrap()
278 .get(function_name)
279 .cloned()
280 }
281
282 pub fn aggr_names(&self) -> Vec<String> {
284 self.aggr_functions
285 .read()
286 .unwrap()
287 .keys()
288 .cloned()
289 .collect()
290 }
291
292 pub fn table_function(&self, function_name: &str) -> Option<Arc<TableFunction>> {
294 self.table_functions
295 .read()
296 .unwrap()
297 .get(function_name)
298 .cloned()
299 }
300
301 pub fn table_function_names(&self) -> Vec<String> {
303 self.table_functions
304 .read()
305 .unwrap()
306 .keys()
307 .cloned()
308 .collect()
309 }
310
311 pub fn register_scalar_function(&self, func: ScalarFunctionFactory) {
314 let name = func.name().to_string();
315 let x = self
316 .scalar_functions
317 .write()
318 .unwrap()
319 .insert(name.clone(), func);
320
321 if x.is_some() {
322 warn!("Already registered scalar function '{name}'");
323 }
324 }
325
326 pub fn register_aggr_function(&self, func: AggregateUDF) {
335 let name = func.name().to_string();
336 let x = self
337 .aggr_functions
338 .write()
339 .unwrap()
340 .insert(name.clone(), func);
341 assert!(
342 x.is_none(),
343 "Already registered aggregate function '{name}'"
344 );
345 }
346
347 pub fn register_table_function(&self, func: Arc<TableFunction>) {
348 let name = func.name();
349 let x = self
350 .table_functions
351 .write()
352 .unwrap()
353 .insert(name.to_string(), func.clone());
354
355 if x.is_some() {
356 warn!("Already registered table function '{name}");
357 }
358 }
359
360 pub fn catalog_manager(&self) -> &CatalogManagerRef {
361 &self.catalog_manager
362 }
363
364 pub fn function_state(&self) -> Arc<FunctionState> {
365 self.function_state.clone()
366 }
367
368 pub fn table_mutation_handler(&self) -> Option<&TableMutationHandlerRef> {
370 self.function_state.table_mutation_handler.as_ref()
371 }
372
373 pub fn procedure_service_handler(&self) -> Option<&ProcedureServiceHandlerRef> {
375 self.function_state.procedure_service_handler.as_ref()
376 }
377
378 pub(crate) fn disallow_cross_catalog_query(&self) -> bool {
379 self.plugins
380 .map::<QueryOptions, _, _>(|x| x.disallow_cross_catalog_query)
381 .unwrap_or(false)
382 }
383
384 pub fn session_state(&self) -> SessionState {
385 self.df_context.state()
386 }
387
388 pub fn read_table(&self, table: TableRef) -> DfResult<DataFrame> {
390 self.df_context
391 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
392 }
393}
394
395struct DfQueryPlanner {
396 physical_planner: DefaultPhysicalPlanner,
397}
398
399impl fmt::Debug for DfQueryPlanner {
400 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
401 f.debug_struct("DfQueryPlanner").finish()
402 }
403}
404
405#[async_trait]
406impl QueryPlanner for DfQueryPlanner {
407 async fn create_physical_plan(
408 &self,
409 logical_plan: &DfLogicalPlan,
410 session_state: &SessionState,
411 ) -> DfResult<Arc<dyn ExecutionPlan>> {
412 self.physical_planner
413 .create_physical_plan(logical_plan, session_state)
414 .await
415 }
416}
417
418impl DfQueryPlanner {
419 fn new(
420 catalog_manager: CatalogManagerRef,
421 partition_rule_manager: Option<PartitionRuleManagerRef>,
422 region_query_handler: Option<RegionQueryHandlerRef>,
423 ) -> Self {
424 let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> =
425 vec![Arc::new(PromExtensionPlanner), Arc::new(RangeSelectPlanner)];
426 if let (Some(region_query_handler), Some(partition_rule_manager)) =
427 (region_query_handler, partition_rule_manager)
428 {
429 planners.push(Arc::new(DistExtensionPlanner::new(
430 catalog_manager,
431 partition_rule_manager,
432 region_query_handler,
433 )));
434 planners.push(Arc::new(MergeSortExtensionPlanner {}));
435 }
436 Self {
437 physical_planner: DefaultPhysicalPlanner::with_extension_planners(planners),
438 }
439 }
440}
441
442#[derive(Debug)]
447struct MetricsMemoryPool {
448 inner: Arc<TrackConsumersPool<GreedyMemoryPool>>,
449}
450
451impl MetricsMemoryPool {
452 const TOP_CONSUMERS_TO_REPORT: usize = 5;
454
455 fn new(limit: usize) -> Self {
456 Self {
457 inner: Arc::new(TrackConsumersPool::new(
458 GreedyMemoryPool::new(limit),
459 NonZeroUsize::new(Self::TOP_CONSUMERS_TO_REPORT).unwrap(),
460 )),
461 }
462 }
463
464 #[inline]
465 fn update_metrics(&self) {
466 QUERY_MEMORY_POOL_USAGE_BYTES.set(self.inner.reserved() as i64);
467 }
468}
469
470impl MemoryPool for MetricsMemoryPool {
471 fn register(&self, consumer: &MemoryConsumer) {
472 self.inner.register(consumer);
473 }
474
475 fn unregister(&self, consumer: &MemoryConsumer) {
476 self.inner.unregister(consumer);
477 }
478
479 fn grow(&self, reservation: &MemoryReservation, additional: usize) {
480 self.inner.grow(reservation, additional);
481 self.update_metrics();
482 }
483
484 fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
485 self.inner.shrink(reservation, shrink);
486 self.update_metrics();
487 }
488
489 fn try_grow(
490 &self,
491 reservation: &MemoryReservation,
492 additional: usize,
493 ) -> datafusion_common::Result<()> {
494 let result = self.inner.try_grow(reservation, additional);
495 if result.is_err() {
496 QUERY_MEMORY_POOL_REJECTED_TOTAL.inc();
497 }
498 self.update_metrics();
499 result
500 }
501
502 fn reserved(&self) -> usize {
503 self.inner.reserved()
504 }
505
506 fn memory_limit(&self) -> MemoryLimit {
507 self.inner.memory_limit()
508 }
509}