query/query_engine/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::sync::{Arc, RwLock};
19
20use async_trait::async_trait;
21use catalog::CatalogManagerRef;
22use common_base::Plugins;
23use common_function::aggrs::aggr_wrapper::fix_order::FixStateUdafOrderingAnalyzer;
24use common_function::function_factory::ScalarFunctionFactory;
25use common_function::function_registry::FUNCTION_REGISTRY;
26use common_function::handlers::{
27    FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
28};
29use common_function::state::FunctionState;
30use common_stat::get_total_memory_bytes;
31use common_telemetry::warn;
32use datafusion::catalog::TableFunction;
33use datafusion::dataframe::DataFrame;
34use datafusion::error::Result as DfResult;
35use datafusion::execution::SessionStateBuilder;
36use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState};
37use datafusion::execution::memory_pool::{
38    GreedyMemoryPool, MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
39    TrackConsumersPool,
40};
41use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
42use datafusion::physical_optimizer::PhysicalOptimizerRule;
43use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
44use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
45use datafusion::physical_plan::ExecutionPlan;
46use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
47use datafusion_expr::{AggregateUDF, LogicalPlan as DfLogicalPlan, WindowUDF};
48use datafusion_optimizer::Analyzer;
49use datafusion_optimizer::analyzer::function_rewrite::ApplyFunctionRewrites;
50use datafusion_optimizer::optimizer::Optimizer;
51use partition::manager::PartitionRuleManagerRef;
52use promql::extension_plan::PromExtensionPlanner;
53use table::TableRef;
54use table::table::adapter::DfTableProviderAdapter;
55
56use crate::QueryEngineContext;
57use crate::dist_plan::{
58    DistExtensionPlanner, DistPlannerAnalyzer, DistPlannerOptions, MergeSortExtensionPlanner,
59};
60use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES};
61use crate::optimizer::ExtensionAnalyzerRule;
62use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
63use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
64use crate::optimizer::parallelize_scan::ParallelizeScan;
65use crate::optimizer::pass_distribution::PassDistribution;
66use crate::optimizer::remove_duplicate::RemoveDuplicate;
67use crate::optimizer::scan_hint::ScanHintRule;
68use crate::optimizer::string_normalization::StringNormalizationRule;
69use crate::optimizer::transcribe_atat::TranscribeAtatRule;
70use crate::optimizer::type_conversion::TypeConversionRule;
71use crate::optimizer::windowed_sort::WindowedSortPhysicalRule;
72use crate::options::QueryOptions as QueryOptionsNew;
73use crate::query_engine::DefaultSerializer;
74use crate::query_engine::options::QueryOptions;
75use crate::range_select::planner::RangeSelectPlanner;
76use crate::region_query::RegionQueryHandlerRef;
77
78/// Query engine global state
79#[derive(Clone)]
80pub struct QueryEngineState {
81    df_context: SessionContext,
82    catalog_manager: CatalogManagerRef,
83    function_state: Arc<FunctionState>,
84    scalar_functions: Arc<RwLock<HashMap<String, ScalarFunctionFactory>>>,
85    aggr_functions: Arc<RwLock<HashMap<String, AggregateUDF>>>,
86    table_functions: Arc<RwLock<HashMap<String, Arc<TableFunction>>>>,
87    extension_rules: Vec<Arc<dyn ExtensionAnalyzerRule + Send + Sync>>,
88    plugins: Plugins,
89}
90
91impl fmt::Debug for QueryEngineState {
92    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
93        f.debug_struct("QueryEngineState")
94            .field("state", &self.df_context.state())
95            .finish()
96    }
97}
98
99impl QueryEngineState {
100    #[allow(clippy::too_many_arguments)]
101    pub fn new(
102        catalog_list: CatalogManagerRef,
103        partition_rule_manager: Option<PartitionRuleManagerRef>,
104        region_query_handler: Option<RegionQueryHandlerRef>,
105        table_mutation_handler: Option<TableMutationHandlerRef>,
106        procedure_service_handler: Option<ProcedureServiceHandlerRef>,
107        flow_service_handler: Option<FlowServiceHandlerRef>,
108        with_dist_planner: bool,
109        plugins: Plugins,
110        options: QueryOptionsNew,
111    ) -> Self {
112        let total_memory = get_total_memory_bytes().max(0) as u64;
113        let memory_pool_size = options.memory_pool_size.resolve(total_memory) as usize;
114        let runtime_env = if memory_pool_size > 0 {
115            Arc::new(
116                RuntimeEnvBuilder::new()
117                    .with_memory_pool(Arc::new(MetricsMemoryPool::new(memory_pool_size)))
118                    .build()
119                    .expect("Failed to build RuntimeEnv"),
120            )
121        } else {
122            Arc::new(RuntimeEnv::default())
123        };
124        let mut session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
125        if options.parallelism > 0 {
126            session_config = session_config.with_target_partitions(options.parallelism);
127        }
128        if options.allow_query_fallback {
129            session_config
130                .options_mut()
131                .extensions
132                .insert(DistPlannerOptions {
133                    allow_query_fallback: true,
134                });
135        }
136
137        // todo(hl): This serves as a workaround for https://github.com/GreptimeTeam/greptimedb/issues/5659
138        // and we can add that check back once we upgrade datafusion.
139        session_config
140            .options_mut()
141            .execution
142            .skip_physical_aggregate_schema_check = true;
143
144        // Apply extension rules
145        let mut extension_rules = Vec::new();
146
147        // The [`TypeConversionRule`] must be at first
148        extension_rules.insert(0, Arc::new(TypeConversionRule) as _);
149
150        // Apply the datafusion rules
151        let mut analyzer = Analyzer::new();
152        analyzer.rules.insert(0, Arc::new(TranscribeAtatRule));
153        analyzer.rules.insert(0, Arc::new(StringNormalizationRule));
154        analyzer
155            .rules
156            .insert(0, Arc::new(CountWildcardToTimeIndexRule));
157
158        // Add ApplyFunctionRewrites rule,
159        // Note we cannot use `analyzer.add_function_rewrite`
160        // because only rules are copied into session_state
161        analyzer.rules.insert(
162            0,
163            Arc::new(ApplyFunctionRewrites::new(
164                FUNCTION_REGISTRY.function_rewrites(),
165            )),
166        );
167
168        if with_dist_planner {
169            analyzer.rules.push(Arc::new(DistPlannerAnalyzer));
170        }
171        analyzer.rules.push(Arc::new(FixStateUdafOrderingAnalyzer));
172
173        let mut optimizer = Optimizer::new();
174        optimizer.rules.push(Arc::new(ScanHintRule));
175
176        // add physical optimizer
177        let mut physical_optimizer = PhysicalOptimizer::new();
178        // Change TableScan's partition right before enforcing distribution
179        physical_optimizer
180            .rules
181            .insert(5, Arc::new(ParallelizeScan));
182        // Pass distribution requirement to MergeScanExec to avoid unnecessary shuffling
183        physical_optimizer
184            .rules
185            .insert(6, Arc::new(PassDistribution));
186        // Enforce sorting AFTER custom rules that modify the plan structure
187        physical_optimizer.rules.insert(
188            7,
189            Arc::new(datafusion::physical_optimizer::enforce_sorting::EnforceSorting {}),
190        );
191        // Add rule for windowed sort
192        physical_optimizer
193            .rules
194            .push(Arc::new(WindowedSortPhysicalRule));
195        // explicitly not do filter pushdown for windowed sort&part sort
196        // (notice that `PartSortExec` create another new dyn filter that need to be pushdown if want to use dyn filter optimization)
197        // benchmark shows it can cause performance regression due to useless filtering and extra shuffle.
198        // We can add a rule to do filter pushdown for windowed sort in the future if we find a way to avoid the performance regression.
199        physical_optimizer
200            .rules
201            .push(Arc::new(MatchesConstantTermOptimizer));
202        // Add rule to remove duplicate nodes generated by other rules. Run this in the last.
203        physical_optimizer.rules.push(Arc::new(RemoveDuplicate));
204        // Place SanityCheckPlan at the end of the list to ensure that it runs after all other rules.
205        Self::remove_physical_optimizer_rule(
206            &mut physical_optimizer.rules,
207            SanityCheckPlan {}.name(),
208        );
209        physical_optimizer.rules.push(Arc::new(SanityCheckPlan {}));
210
211        let session_state = SessionStateBuilder::new()
212            .with_config(session_config)
213            .with_runtime_env(runtime_env)
214            .with_default_features()
215            .with_analyzer_rules(analyzer.rules)
216            .with_serializer_registry(Arc::new(DefaultSerializer))
217            .with_query_planner(Arc::new(DfQueryPlanner::new(
218                catalog_list.clone(),
219                partition_rule_manager,
220                region_query_handler,
221            )))
222            .with_optimizer_rules(optimizer.rules)
223            .with_physical_optimizer_rules(physical_optimizer.rules)
224            .build();
225
226        let df_context = SessionContext::new_with_state(session_state);
227        register_function_aliases(&df_context);
228
229        Self {
230            df_context,
231            catalog_manager: catalog_list,
232            function_state: Arc::new(FunctionState {
233                table_mutation_handler,
234                procedure_service_handler,
235                flow_service_handler,
236            }),
237            aggr_functions: Arc::new(RwLock::new(HashMap::new())),
238            table_functions: Arc::new(RwLock::new(HashMap::new())),
239            extension_rules,
240            plugins,
241            scalar_functions: Arc::new(RwLock::new(HashMap::new())),
242        }
243    }
244
245    fn remove_physical_optimizer_rule(
246        rules: &mut Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
247        name: &str,
248    ) {
249        rules.retain(|rule| rule.name() != name);
250    }
251
252    /// Optimize the logical plan by the extension analyzer rules.
253    pub fn optimize_by_extension_rules(
254        &self,
255        plan: DfLogicalPlan,
256        context: &QueryEngineContext,
257    ) -> DfResult<DfLogicalPlan> {
258        self.extension_rules
259            .iter()
260            .try_fold(plan, |acc_plan, rule| {
261                rule.analyze(acc_plan, context, self.session_state().config_options())
262            })
263    }
264
265    /// Run the full logical plan optimize phase for the given plan.
266    pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult<DfLogicalPlan> {
267        self.session_state().optimize(&plan)
268    }
269
270    /// Retrieve the scalar function by name
271    pub fn scalar_function(&self, function_name: &str) -> Option<ScalarFunctionFactory> {
272        self.scalar_functions
273            .read()
274            .unwrap()
275            .get(function_name)
276            .cloned()
277    }
278
279    /// Retrieve scalar function names.
280    pub fn scalar_names(&self) -> Vec<String> {
281        self.scalar_functions
282            .read()
283            .unwrap()
284            .keys()
285            .cloned()
286            .collect()
287    }
288
289    /// Retrieve the aggregate function by name
290    pub fn aggr_function(&self, function_name: &str) -> Option<AggregateUDF> {
291        self.aggr_functions
292            .read()
293            .unwrap()
294            .get(function_name)
295            .cloned()
296    }
297
298    /// Retrieve aggregate function names.
299    pub fn aggr_names(&self) -> Vec<String> {
300        self.aggr_functions
301            .read()
302            .unwrap()
303            .keys()
304            .cloned()
305            .collect()
306    }
307
308    /// Retrieve table function by name
309    pub fn table_function(&self, function_name: &str) -> Option<Arc<TableFunction>> {
310        self.table_functions
311            .read()
312            .unwrap()
313            .get(function_name)
314            .cloned()
315    }
316
317    /// Retrieve table function names.
318    pub fn table_function_names(&self) -> Vec<String> {
319        self.table_functions
320            .read()
321            .unwrap()
322            .keys()
323            .cloned()
324            .collect()
325    }
326
327    /// Register an scalar function.
328    /// Will override if the function with same name is already registered.
329    pub fn register_scalar_function(&self, func: ScalarFunctionFactory) {
330        let name = func.name().to_string();
331        let x = self
332            .scalar_functions
333            .write()
334            .unwrap()
335            .insert(name.clone(), func);
336
337        if x.is_some() {
338            warn!("Already registered scalar function '{name}'");
339        }
340    }
341
342    /// Register an aggregate function.
343    ///
344    /// # Panics
345    /// Will panic if the function with same name is already registered.
346    ///
347    /// Panicking consideration: currently the aggregated functions are all statically registered,
348    /// user cannot define their own aggregate functions on the fly. So we can panic here. If that
349    /// invariant is broken in the future, we should return an error instead of panicking.
350    pub fn register_aggr_function(&self, func: AggregateUDF) {
351        let name = func.name().to_string();
352        let x = self
353            .aggr_functions
354            .write()
355            .unwrap()
356            .insert(name.clone(), func);
357        assert!(
358            x.is_none(),
359            "Already registered aggregate function '{name}'"
360        );
361    }
362
363    pub fn register_table_function(&self, func: Arc<TableFunction>) {
364        let name = func.name();
365        let x = self
366            .table_functions
367            .write()
368            .unwrap()
369            .insert(name.to_string(), func.clone());
370
371        if x.is_some() {
372            warn!("Already registered table function '{name}'");
373        }
374    }
375
376    /// Register a window function (UDWF) directly on the DataFusion SessionContext.
377    ///
378    /// This makes the function visible via `session_state.window_functions()`,
379    /// which is used by `DfContextProviderAdapter::get_window_meta`.
380    pub fn register_window_function(&self, func: WindowUDF) {
381        self.df_context.register_udwf(func);
382    }
383
384    pub fn catalog_manager(&self) -> &CatalogManagerRef {
385        &self.catalog_manager
386    }
387
388    pub fn function_state(&self) -> Arc<FunctionState> {
389        self.function_state.clone()
390    }
391
392    /// Returns the [`TableMutationHandlerRef`] in state.
393    pub fn table_mutation_handler(&self) -> Option<&TableMutationHandlerRef> {
394        self.function_state.table_mutation_handler.as_ref()
395    }
396
397    /// Returns the [`ProcedureServiceHandlerRef`] in state.
398    pub fn procedure_service_handler(&self) -> Option<&ProcedureServiceHandlerRef> {
399        self.function_state.procedure_service_handler.as_ref()
400    }
401
402    pub(crate) fn disallow_cross_catalog_query(&self) -> bool {
403        self.plugins
404            .map::<QueryOptions, _, _>(|x| x.disallow_cross_catalog_query)
405            .unwrap_or(false)
406    }
407
408    pub fn session_state(&self) -> SessionState {
409        self.df_context.state()
410    }
411
412    /// Create a DataFrame for a table
413    pub fn read_table(&self, table: TableRef) -> DfResult<DataFrame> {
414        self.df_context
415            .read_table(Arc::new(DfTableProviderAdapter::new(table)))
416    }
417}
418
419struct DfQueryPlanner {
420    physical_planner: DefaultPhysicalPlanner,
421}
422
423impl fmt::Debug for DfQueryPlanner {
424    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
425        f.debug_struct("DfQueryPlanner").finish()
426    }
427}
428
429#[async_trait]
430impl QueryPlanner for DfQueryPlanner {
431    async fn create_physical_plan(
432        &self,
433        logical_plan: &DfLogicalPlan,
434        session_state: &SessionState,
435    ) -> DfResult<Arc<dyn ExecutionPlan>> {
436        self.physical_planner
437            .create_physical_plan(logical_plan, session_state)
438            .await
439    }
440}
441
442/// MySQL-compatible scalar function aliases: (target_name, alias)
443const SCALAR_FUNCTION_ALIASES: &[(&str, &str)] = &[
444    ("upper", "ucase"),
445    ("lower", "lcase"),
446    ("ceil", "ceiling"),
447    ("substr", "mid"),
448    ("random", "rand"),
449];
450
451/// MySQL-compatible aggregate function aliases: (target_name, alias)
452const AGGREGATE_FUNCTION_ALIASES: &[(&str, &str)] =
453    &[("stddev_pop", "std"), ("var_pop", "variance")];
454
455/// Register function aliases.
456///
457/// This function adds aliases like `ucase` -> `upper`, `lcase` -> `lower`, etc.
458/// to make GreptimeDB more compatible with MySQL syntax.
459fn register_function_aliases(ctx: &SessionContext) {
460    let state = ctx.state();
461
462    for (target, alias) in SCALAR_FUNCTION_ALIASES {
463        if let Some(func) = state.scalar_functions().get(*target) {
464            let aliased = func.as_ref().clone().with_aliases([*alias]);
465            ctx.register_udf(aliased);
466        }
467    }
468
469    for (target, alias) in AGGREGATE_FUNCTION_ALIASES {
470        if let Some(func) = state.aggregate_functions().get(*target) {
471            let aliased = func.as_ref().clone().with_aliases([*alias]);
472            ctx.register_udaf(aliased);
473        }
474    }
475}
476
477impl DfQueryPlanner {
478    fn new(
479        catalog_manager: CatalogManagerRef,
480        partition_rule_manager: Option<PartitionRuleManagerRef>,
481        region_query_handler: Option<RegionQueryHandlerRef>,
482    ) -> Self {
483        let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> =
484            vec![Arc::new(PromExtensionPlanner), Arc::new(RangeSelectPlanner)];
485        if let (Some(region_query_handler), Some(partition_rule_manager)) =
486            (region_query_handler, partition_rule_manager)
487        {
488            planners.push(Arc::new(DistExtensionPlanner::new(
489                catalog_manager,
490                partition_rule_manager,
491                region_query_handler,
492            )));
493            planners.push(Arc::new(MergeSortExtensionPlanner {}));
494        }
495        Self {
496            physical_planner: DefaultPhysicalPlanner::with_extension_planners(planners),
497        }
498    }
499}
500
501/// A wrapper around TrackConsumersPool that records metrics.
502///
503/// This wrapper intercepts all memory pool operations and updates
504/// Prometheus metrics for monitoring query memory usage and rejections.
505#[derive(Debug)]
506struct MetricsMemoryPool {
507    inner: Arc<TrackConsumersPool<GreedyMemoryPool>>,
508}
509
510impl MetricsMemoryPool {
511    // Number of top memory consumers to report in OOM error messages
512    const TOP_CONSUMERS_TO_REPORT: usize = 5;
513
514    fn new(limit: usize) -> Self {
515        Self {
516            inner: Arc::new(TrackConsumersPool::new(
517                GreedyMemoryPool::new(limit),
518                NonZeroUsize::new(Self::TOP_CONSUMERS_TO_REPORT).unwrap(),
519            )),
520        }
521    }
522
523    #[inline]
524    fn update_metrics(&self) {
525        QUERY_MEMORY_POOL_USAGE_BYTES.set(self.inner.reserved() as i64);
526    }
527}
528
529impl MemoryPool for MetricsMemoryPool {
530    fn register(&self, consumer: &MemoryConsumer) {
531        self.inner.register(consumer);
532    }
533
534    fn unregister(&self, consumer: &MemoryConsumer) {
535        self.inner.unregister(consumer);
536    }
537
538    fn grow(&self, reservation: &MemoryReservation, additional: usize) {
539        self.inner.grow(reservation, additional);
540        self.update_metrics();
541    }
542
543    fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
544        self.inner.shrink(reservation, shrink);
545        self.update_metrics();
546    }
547
548    fn try_grow(
549        &self,
550        reservation: &MemoryReservation,
551        additional: usize,
552    ) -> datafusion_common::Result<()> {
553        let result = self.inner.try_grow(reservation, additional);
554        if result.is_err() {
555            QUERY_MEMORY_POOL_REJECTED_TOTAL.inc();
556        }
557        self.update_metrics();
558        result
559    }
560
561    fn reserved(&self) -> usize {
562        self.inner.reserved()
563    }
564
565    fn memory_limit(&self) -> MemoryLimit {
566        self.inner.memory_limit()
567    }
568}