query/query_engine/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt;
17use std::sync::{Arc, RwLock};
18
19use async_trait::async_trait;
20use catalog::CatalogManagerRef;
21use common_base::Plugins;
22use common_function::aggrs::aggr_wrapper::fix_order::FixStateUdafOrderingAnalyzer;
23use common_function::function_factory::ScalarFunctionFactory;
24use common_function::handlers::{
25    FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
26};
27use common_function::state::FunctionState;
28use common_stat::get_total_memory_bytes;
29use common_telemetry::warn;
30use datafusion::catalog::TableFunction;
31use datafusion::dataframe::DataFrame;
32use datafusion::error::Result as DfResult;
33use datafusion::execution::SessionStateBuilder;
34use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState};
35use datafusion::execution::memory_pool::{
36    GreedyMemoryPool, MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
37};
38use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
39use datafusion::physical_optimizer::PhysicalOptimizerRule;
40use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
41use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
42use datafusion::physical_plan::ExecutionPlan;
43use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
44use datafusion_expr::{AggregateUDF, LogicalPlan as DfLogicalPlan};
45use datafusion_optimizer::analyzer::Analyzer;
46use datafusion_optimizer::optimizer::Optimizer;
47use partition::manager::PartitionRuleManagerRef;
48use promql::extension_plan::PromExtensionPlanner;
49use table::TableRef;
50use table::table::adapter::DfTableProviderAdapter;
51
52use crate::QueryEngineContext;
53use crate::dist_plan::{
54    DistExtensionPlanner, DistPlannerAnalyzer, DistPlannerOptions, MergeSortExtensionPlanner,
55};
56use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES};
57use crate::optimizer::ExtensionAnalyzerRule;
58use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
59use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
60use crate::optimizer::parallelize_scan::ParallelizeScan;
61use crate::optimizer::pass_distribution::PassDistribution;
62use crate::optimizer::remove_duplicate::RemoveDuplicate;
63use crate::optimizer::scan_hint::ScanHintRule;
64use crate::optimizer::string_normalization::StringNormalizationRule;
65use crate::optimizer::transcribe_atat::TranscribeAtatRule;
66use crate::optimizer::type_conversion::TypeConversionRule;
67use crate::optimizer::windowed_sort::WindowedSortPhysicalRule;
68use crate::options::QueryOptions as QueryOptionsNew;
69use crate::query_engine::DefaultSerializer;
70use crate::query_engine::options::QueryOptions;
71use crate::range_select::planner::RangeSelectPlanner;
72use crate::region_query::RegionQueryHandlerRef;
73
74/// Query engine global state
75#[derive(Clone)]
76pub struct QueryEngineState {
77    df_context: SessionContext,
78    catalog_manager: CatalogManagerRef,
79    function_state: Arc<FunctionState>,
80    scalar_functions: Arc<RwLock<HashMap<String, ScalarFunctionFactory>>>,
81    aggr_functions: Arc<RwLock<HashMap<String, AggregateUDF>>>,
82    table_functions: Arc<RwLock<HashMap<String, Arc<TableFunction>>>>,
83    extension_rules: Vec<Arc<dyn ExtensionAnalyzerRule + Send + Sync>>,
84    plugins: Plugins,
85}
86
87impl fmt::Debug for QueryEngineState {
88    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
89        f.debug_struct("QueryEngineState")
90            .field("state", &self.df_context.state())
91            .finish()
92    }
93}
94
95impl QueryEngineState {
96    #[allow(clippy::too_many_arguments)]
97    pub fn new(
98        catalog_list: CatalogManagerRef,
99        partition_rule_manager: Option<PartitionRuleManagerRef>,
100        region_query_handler: Option<RegionQueryHandlerRef>,
101        table_mutation_handler: Option<TableMutationHandlerRef>,
102        procedure_service_handler: Option<ProcedureServiceHandlerRef>,
103        flow_service_handler: Option<FlowServiceHandlerRef>,
104        with_dist_planner: bool,
105        plugins: Plugins,
106        options: QueryOptionsNew,
107    ) -> Self {
108        let total_memory = get_total_memory_bytes().max(0) as u64;
109        let memory_pool_size = options.memory_pool_size.resolve(total_memory) as usize;
110        let runtime_env = if memory_pool_size > 0 {
111            Arc::new(
112                RuntimeEnvBuilder::new()
113                    .with_memory_pool(Arc::new(MetricsMemoryPool::new(memory_pool_size)))
114                    .build()
115                    .expect("Failed to build RuntimeEnv"),
116            )
117        } else {
118            Arc::new(RuntimeEnv::default())
119        };
120        let mut session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
121        if options.parallelism > 0 {
122            session_config = session_config.with_target_partitions(options.parallelism);
123        }
124        if options.allow_query_fallback {
125            session_config
126                .options_mut()
127                .extensions
128                .insert(DistPlannerOptions {
129                    allow_query_fallback: true,
130                });
131        }
132
133        // todo(hl): This serves as a workaround for https://github.com/GreptimeTeam/greptimedb/issues/5659
134        // and we can add that check back once we upgrade datafusion.
135        session_config
136            .options_mut()
137            .execution
138            .skip_physical_aggregate_schema_check = true;
139
140        // Apply extension rules
141        let mut extension_rules = Vec::new();
142
143        // The [`TypeConversionRule`] must be at first
144        extension_rules.insert(0, Arc::new(TypeConversionRule) as _);
145
146        // Apply the datafusion rules
147        let mut analyzer = Analyzer::new();
148        analyzer.rules.insert(0, Arc::new(TranscribeAtatRule));
149        analyzer.rules.insert(0, Arc::new(StringNormalizationRule));
150        analyzer
151            .rules
152            .insert(0, Arc::new(CountWildcardToTimeIndexRule));
153
154        if with_dist_planner {
155            analyzer.rules.push(Arc::new(DistPlannerAnalyzer));
156        }
157
158        analyzer.rules.push(Arc::new(FixStateUdafOrderingAnalyzer));
159
160        let mut optimizer = Optimizer::new();
161        optimizer.rules.push(Arc::new(ScanHintRule));
162
163        // add physical optimizer
164        let mut physical_optimizer = PhysicalOptimizer::new();
165        // Change TableScan's partition right before enforcing distribution
166        physical_optimizer
167            .rules
168            .insert(5, Arc::new(ParallelizeScan));
169        // Pass distribution requirement to MergeScanExec to avoid unnecessary shuffling
170        physical_optimizer
171            .rules
172            .insert(6, Arc::new(PassDistribution));
173        // Enforce sorting AFTER custom rules that modify the plan structure
174        physical_optimizer.rules.insert(
175            7,
176            Arc::new(datafusion::physical_optimizer::enforce_sorting::EnforceSorting {}),
177        );
178        // Add rule for windowed sort
179        physical_optimizer
180            .rules
181            .push(Arc::new(WindowedSortPhysicalRule));
182        physical_optimizer
183            .rules
184            .push(Arc::new(MatchesConstantTermOptimizer));
185        // Add rule to remove duplicate nodes generated by other rules. Run this in the last.
186        physical_optimizer.rules.push(Arc::new(RemoveDuplicate));
187        // Place SanityCheckPlan at the end of the list to ensure that it runs after all other rules.
188        Self::remove_physical_optimizer_rule(
189            &mut physical_optimizer.rules,
190            SanityCheckPlan {}.name(),
191        );
192        physical_optimizer.rules.push(Arc::new(SanityCheckPlan {}));
193
194        let session_state = SessionStateBuilder::new()
195            .with_config(session_config)
196            .with_runtime_env(runtime_env)
197            .with_default_features()
198            .with_analyzer_rules(analyzer.rules)
199            .with_serializer_registry(Arc::new(DefaultSerializer))
200            .with_query_planner(Arc::new(DfQueryPlanner::new(
201                catalog_list.clone(),
202                partition_rule_manager,
203                region_query_handler,
204            )))
205            .with_optimizer_rules(optimizer.rules)
206            .with_physical_optimizer_rules(physical_optimizer.rules)
207            .build();
208
209        let df_context = SessionContext::new_with_state(session_state);
210
211        Self {
212            df_context,
213            catalog_manager: catalog_list,
214            function_state: Arc::new(FunctionState {
215                table_mutation_handler,
216                procedure_service_handler,
217                flow_service_handler,
218            }),
219            aggr_functions: Arc::new(RwLock::new(HashMap::new())),
220            table_functions: Arc::new(RwLock::new(HashMap::new())),
221            extension_rules,
222            plugins,
223            scalar_functions: Arc::new(RwLock::new(HashMap::new())),
224        }
225    }
226
227    fn remove_physical_optimizer_rule(
228        rules: &mut Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
229        name: &str,
230    ) {
231        rules.retain(|rule| rule.name() != name);
232    }
233
234    /// Optimize the logical plan by the extension anayzer rules.
235    pub fn optimize_by_extension_rules(
236        &self,
237        plan: DfLogicalPlan,
238        context: &QueryEngineContext,
239    ) -> DfResult<DfLogicalPlan> {
240        self.extension_rules
241            .iter()
242            .try_fold(plan, |acc_plan, rule| {
243                rule.analyze(acc_plan, context, self.session_state().config_options())
244            })
245    }
246
247    /// Run the full logical plan optimize phase for the given plan.
248    pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult<DfLogicalPlan> {
249        self.session_state().optimize(&plan)
250    }
251
252    /// Retrieve the scalar function by name
253    pub fn scalar_function(&self, function_name: &str) -> Option<ScalarFunctionFactory> {
254        self.scalar_functions
255            .read()
256            .unwrap()
257            .get(function_name)
258            .cloned()
259    }
260
261    /// Retrieve scalar function names.
262    pub fn scalar_names(&self) -> Vec<String> {
263        self.scalar_functions
264            .read()
265            .unwrap()
266            .keys()
267            .cloned()
268            .collect()
269    }
270
271    /// Retrieve the aggregate function by name
272    pub fn aggr_function(&self, function_name: &str) -> Option<AggregateUDF> {
273        self.aggr_functions
274            .read()
275            .unwrap()
276            .get(function_name)
277            .cloned()
278    }
279
280    /// Retrieve aggregate function names.
281    pub fn aggr_names(&self) -> Vec<String> {
282        self.aggr_functions
283            .read()
284            .unwrap()
285            .keys()
286            .cloned()
287            .collect()
288    }
289
290    /// Retrieve table function by name
291    pub fn table_function(&self, function_name: &str) -> Option<Arc<TableFunction>> {
292        self.table_functions
293            .read()
294            .unwrap()
295            .get(function_name)
296            .cloned()
297    }
298
299    /// Retrieve table function names.
300    pub fn table_function_names(&self) -> Vec<String> {
301        self.table_functions
302            .read()
303            .unwrap()
304            .keys()
305            .cloned()
306            .collect()
307    }
308
309    /// Register an scalar function.
310    /// Will override if the function with same name is already registered.
311    pub fn register_scalar_function(&self, func: ScalarFunctionFactory) {
312        let name = func.name().to_string();
313        let x = self
314            .scalar_functions
315            .write()
316            .unwrap()
317            .insert(name.clone(), func);
318
319        if x.is_some() {
320            warn!("Already registered scalar function '{name}'");
321        }
322    }
323
324    /// Register an aggregate function.
325    ///
326    /// # Panics
327    /// Will panic if the function with same name is already registered.
328    ///
329    /// Panicking consideration: currently the aggregated functions are all statically registered,
330    /// user cannot define their own aggregate functions on the fly. So we can panic here. If that
331    /// invariant is broken in the future, we should return an error instead of panicking.
332    pub fn register_aggr_function(&self, func: AggregateUDF) {
333        let name = func.name().to_string();
334        let x = self
335            .aggr_functions
336            .write()
337            .unwrap()
338            .insert(name.clone(), func);
339        assert!(
340            x.is_none(),
341            "Already registered aggregate function '{name}'"
342        );
343    }
344
345    pub fn register_table_function(&self, func: Arc<TableFunction>) {
346        let name = func.name();
347        let x = self
348            .table_functions
349            .write()
350            .unwrap()
351            .insert(name.to_string(), func.clone());
352
353        if x.is_some() {
354            warn!("Already registered table function '{name}");
355        }
356    }
357
358    pub fn catalog_manager(&self) -> &CatalogManagerRef {
359        &self.catalog_manager
360    }
361
362    pub fn function_state(&self) -> Arc<FunctionState> {
363        self.function_state.clone()
364    }
365
366    /// Returns the [`TableMutationHandlerRef`] in state.
367    pub fn table_mutation_handler(&self) -> Option<&TableMutationHandlerRef> {
368        self.function_state.table_mutation_handler.as_ref()
369    }
370
371    /// Returns the [`ProcedureServiceHandlerRef`] in state.
372    pub fn procedure_service_handler(&self) -> Option<&ProcedureServiceHandlerRef> {
373        self.function_state.procedure_service_handler.as_ref()
374    }
375
376    pub(crate) fn disallow_cross_catalog_query(&self) -> bool {
377        self.plugins
378            .map::<QueryOptions, _, _>(|x| x.disallow_cross_catalog_query)
379            .unwrap_or(false)
380    }
381
382    pub fn session_state(&self) -> SessionState {
383        self.df_context.state()
384    }
385
386    /// Create a DataFrame for a table
387    pub fn read_table(&self, table: TableRef) -> DfResult<DataFrame> {
388        self.df_context
389            .read_table(Arc::new(DfTableProviderAdapter::new(table)))
390    }
391}
392
393struct DfQueryPlanner {
394    physical_planner: DefaultPhysicalPlanner,
395}
396
397impl fmt::Debug for DfQueryPlanner {
398    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
399        f.debug_struct("DfQueryPlanner").finish()
400    }
401}
402
403#[async_trait]
404impl QueryPlanner for DfQueryPlanner {
405    async fn create_physical_plan(
406        &self,
407        logical_plan: &DfLogicalPlan,
408        session_state: &SessionState,
409    ) -> DfResult<Arc<dyn ExecutionPlan>> {
410        self.physical_planner
411            .create_physical_plan(logical_plan, session_state)
412            .await
413    }
414}
415
416impl DfQueryPlanner {
417    fn new(
418        catalog_manager: CatalogManagerRef,
419        partition_rule_manager: Option<PartitionRuleManagerRef>,
420        region_query_handler: Option<RegionQueryHandlerRef>,
421    ) -> Self {
422        let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> =
423            vec![Arc::new(PromExtensionPlanner), Arc::new(RangeSelectPlanner)];
424        if let (Some(region_query_handler), Some(partition_rule_manager)) =
425            (region_query_handler, partition_rule_manager)
426        {
427            planners.push(Arc::new(DistExtensionPlanner::new(
428                catalog_manager,
429                partition_rule_manager,
430                region_query_handler,
431            )));
432            planners.push(Arc::new(MergeSortExtensionPlanner {}));
433        }
434        Self {
435            physical_planner: DefaultPhysicalPlanner::with_extension_planners(planners),
436        }
437    }
438}
439
440/// A wrapper around GreedyMemoryPool that records metrics.
441///
442/// This wrapper intercepts all memory pool operations and updates
443/// Prometheus metrics for monitoring query memory usage and rejections.
444#[derive(Debug)]
445struct MetricsMemoryPool {
446    inner: Arc<GreedyMemoryPool>,
447}
448
449impl MetricsMemoryPool {
450    fn new(limit: usize) -> Self {
451        Self {
452            inner: Arc::new(GreedyMemoryPool::new(limit)),
453        }
454    }
455
456    #[inline]
457    fn update_metrics(&self) {
458        QUERY_MEMORY_POOL_USAGE_BYTES.set(self.inner.reserved() as i64);
459    }
460}
461
462impl MemoryPool for MetricsMemoryPool {
463    fn register(&self, consumer: &MemoryConsumer) {
464        self.inner.register(consumer);
465    }
466
467    fn unregister(&self, consumer: &MemoryConsumer) {
468        self.inner.unregister(consumer);
469    }
470
471    fn grow(&self, reservation: &MemoryReservation, additional: usize) {
472        self.inner.grow(reservation, additional);
473        self.update_metrics();
474    }
475
476    fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
477        self.inner.shrink(reservation, shrink);
478        self.update_metrics();
479    }
480
481    fn try_grow(
482        &self,
483        reservation: &MemoryReservation,
484        additional: usize,
485    ) -> datafusion_common::Result<()> {
486        let result = self.inner.try_grow(reservation, additional);
487        if result.is_err() {
488            QUERY_MEMORY_POOL_REJECTED_TOTAL.inc();
489        }
490        self.update_metrics();
491        result
492    }
493
494    fn reserved(&self) -> usize {
495        self.inner.reserved()
496    }
497
498    fn memory_limit(&self) -> MemoryLimit {
499        self.inner.memory_limit()
500    }
501}