query/query_engine/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::sync::{Arc, RwLock};
19
20use async_trait::async_trait;
21use catalog::CatalogManagerRef;
22use common_base::Plugins;
23use common_function::aggrs::aggr_wrapper::fix_order::FixStateUdafOrderingAnalyzer;
24use common_function::function_factory::ScalarFunctionFactory;
25use common_function::handlers::{
26    FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
27};
28use common_function::state::FunctionState;
29use common_stat::get_total_memory_bytes;
30use common_telemetry::warn;
31use datafusion::catalog::TableFunction;
32use datafusion::dataframe::DataFrame;
33use datafusion::error::Result as DfResult;
34use datafusion::execution::SessionStateBuilder;
35use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState};
36use datafusion::execution::memory_pool::{
37    GreedyMemoryPool, MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
38    TrackConsumersPool,
39};
40use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
41use datafusion::physical_optimizer::PhysicalOptimizerRule;
42use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
43use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
44use datafusion::physical_plan::ExecutionPlan;
45use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
46use datafusion_expr::{AggregateUDF, LogicalPlan as DfLogicalPlan};
47use datafusion_optimizer::analyzer::Analyzer;
48use datafusion_optimizer::optimizer::Optimizer;
49use partition::manager::PartitionRuleManagerRef;
50use promql::extension_plan::PromExtensionPlanner;
51use table::TableRef;
52use table::table::adapter::DfTableProviderAdapter;
53
54use crate::QueryEngineContext;
55use crate::dist_plan::{
56    DistExtensionPlanner, DistPlannerAnalyzer, DistPlannerOptions, MergeSortExtensionPlanner,
57};
58use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES};
59use crate::optimizer::ExtensionAnalyzerRule;
60use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
61use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
62use crate::optimizer::parallelize_scan::ParallelizeScan;
63use crate::optimizer::pass_distribution::PassDistribution;
64use crate::optimizer::remove_duplicate::RemoveDuplicate;
65use crate::optimizer::scan_hint::ScanHintRule;
66use crate::optimizer::string_normalization::StringNormalizationRule;
67use crate::optimizer::transcribe_atat::TranscribeAtatRule;
68use crate::optimizer::type_conversion::TypeConversionRule;
69use crate::optimizer::windowed_sort::WindowedSortPhysicalRule;
70use crate::options::QueryOptions as QueryOptionsNew;
71use crate::query_engine::DefaultSerializer;
72use crate::query_engine::options::QueryOptions;
73use crate::range_select::planner::RangeSelectPlanner;
74use crate::region_query::RegionQueryHandlerRef;
75
76/// Query engine global state
77#[derive(Clone)]
78pub struct QueryEngineState {
79    df_context: SessionContext,
80    catalog_manager: CatalogManagerRef,
81    function_state: Arc<FunctionState>,
82    scalar_functions: Arc<RwLock<HashMap<String, ScalarFunctionFactory>>>,
83    aggr_functions: Arc<RwLock<HashMap<String, AggregateUDF>>>,
84    table_functions: Arc<RwLock<HashMap<String, Arc<TableFunction>>>>,
85    extension_rules: Vec<Arc<dyn ExtensionAnalyzerRule + Send + Sync>>,
86    plugins: Plugins,
87}
88
89impl fmt::Debug for QueryEngineState {
90    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
91        f.debug_struct("QueryEngineState")
92            .field("state", &self.df_context.state())
93            .finish()
94    }
95}
96
97impl QueryEngineState {
98    #[allow(clippy::too_many_arguments)]
99    pub fn new(
100        catalog_list: CatalogManagerRef,
101        partition_rule_manager: Option<PartitionRuleManagerRef>,
102        region_query_handler: Option<RegionQueryHandlerRef>,
103        table_mutation_handler: Option<TableMutationHandlerRef>,
104        procedure_service_handler: Option<ProcedureServiceHandlerRef>,
105        flow_service_handler: Option<FlowServiceHandlerRef>,
106        with_dist_planner: bool,
107        plugins: Plugins,
108        options: QueryOptionsNew,
109    ) -> Self {
110        let total_memory = get_total_memory_bytes().max(0) as u64;
111        let memory_pool_size = options.memory_pool_size.resolve(total_memory) as usize;
112        let runtime_env = if memory_pool_size > 0 {
113            Arc::new(
114                RuntimeEnvBuilder::new()
115                    .with_memory_pool(Arc::new(MetricsMemoryPool::new(memory_pool_size)))
116                    .build()
117                    .expect("Failed to build RuntimeEnv"),
118            )
119        } else {
120            Arc::new(RuntimeEnv::default())
121        };
122        let mut session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
123        if options.parallelism > 0 {
124            session_config = session_config.with_target_partitions(options.parallelism);
125        }
126        if options.allow_query_fallback {
127            session_config
128                .options_mut()
129                .extensions
130                .insert(DistPlannerOptions {
131                    allow_query_fallback: true,
132                });
133        }
134
135        // todo(hl): This serves as a workaround for https://github.com/GreptimeTeam/greptimedb/issues/5659
136        // and we can add that check back once we upgrade datafusion.
137        session_config
138            .options_mut()
139            .execution
140            .skip_physical_aggregate_schema_check = true;
141
142        // Apply extension rules
143        let mut extension_rules = Vec::new();
144
145        // The [`TypeConversionRule`] must be at first
146        extension_rules.insert(0, Arc::new(TypeConversionRule) as _);
147
148        // Apply the datafusion rules
149        let mut analyzer = Analyzer::new();
150        analyzer.rules.insert(0, Arc::new(TranscribeAtatRule));
151        analyzer.rules.insert(0, Arc::new(StringNormalizationRule));
152        analyzer
153            .rules
154            .insert(0, Arc::new(CountWildcardToTimeIndexRule));
155
156        if with_dist_planner {
157            analyzer.rules.push(Arc::new(DistPlannerAnalyzer));
158        }
159
160        analyzer.rules.push(Arc::new(FixStateUdafOrderingAnalyzer));
161
162        let mut optimizer = Optimizer::new();
163        optimizer.rules.push(Arc::new(ScanHintRule));
164
165        // add physical optimizer
166        let mut physical_optimizer = PhysicalOptimizer::new();
167        // Change TableScan's partition right before enforcing distribution
168        physical_optimizer
169            .rules
170            .insert(5, Arc::new(ParallelizeScan));
171        // Pass distribution requirement to MergeScanExec to avoid unnecessary shuffling
172        physical_optimizer
173            .rules
174            .insert(6, Arc::new(PassDistribution));
175        // Enforce sorting AFTER custom rules that modify the plan structure
176        physical_optimizer.rules.insert(
177            7,
178            Arc::new(datafusion::physical_optimizer::enforce_sorting::EnforceSorting {}),
179        );
180        // Add rule for windowed sort
181        physical_optimizer
182            .rules
183            .push(Arc::new(WindowedSortPhysicalRule));
184        physical_optimizer
185            .rules
186            .push(Arc::new(MatchesConstantTermOptimizer));
187        // Add rule to remove duplicate nodes generated by other rules. Run this in the last.
188        physical_optimizer.rules.push(Arc::new(RemoveDuplicate));
189        // Place SanityCheckPlan at the end of the list to ensure that it runs after all other rules.
190        Self::remove_physical_optimizer_rule(
191            &mut physical_optimizer.rules,
192            SanityCheckPlan {}.name(),
193        );
194        physical_optimizer.rules.push(Arc::new(SanityCheckPlan {}));
195
196        let session_state = SessionStateBuilder::new()
197            .with_config(session_config)
198            .with_runtime_env(runtime_env)
199            .with_default_features()
200            .with_analyzer_rules(analyzer.rules)
201            .with_serializer_registry(Arc::new(DefaultSerializer))
202            .with_query_planner(Arc::new(DfQueryPlanner::new(
203                catalog_list.clone(),
204                partition_rule_manager,
205                region_query_handler,
206            )))
207            .with_optimizer_rules(optimizer.rules)
208            .with_physical_optimizer_rules(physical_optimizer.rules)
209            .build();
210
211        let df_context = SessionContext::new_with_state(session_state);
212        register_function_aliases(&df_context);
213
214        Self {
215            df_context,
216            catalog_manager: catalog_list,
217            function_state: Arc::new(FunctionState {
218                table_mutation_handler,
219                procedure_service_handler,
220                flow_service_handler,
221            }),
222            aggr_functions: Arc::new(RwLock::new(HashMap::new())),
223            table_functions: Arc::new(RwLock::new(HashMap::new())),
224            extension_rules,
225            plugins,
226            scalar_functions: Arc::new(RwLock::new(HashMap::new())),
227        }
228    }
229
230    fn remove_physical_optimizer_rule(
231        rules: &mut Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
232        name: &str,
233    ) {
234        rules.retain(|rule| rule.name() != name);
235    }
236
237    /// Optimize the logical plan by the extension anayzer rules.
238    pub fn optimize_by_extension_rules(
239        &self,
240        plan: DfLogicalPlan,
241        context: &QueryEngineContext,
242    ) -> DfResult<DfLogicalPlan> {
243        self.extension_rules
244            .iter()
245            .try_fold(plan, |acc_plan, rule| {
246                rule.analyze(acc_plan, context, self.session_state().config_options())
247            })
248    }
249
250    /// Run the full logical plan optimize phase for the given plan.
251    pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult<DfLogicalPlan> {
252        self.session_state().optimize(&plan)
253    }
254
255    /// Retrieve the scalar function by name
256    pub fn scalar_function(&self, function_name: &str) -> Option<ScalarFunctionFactory> {
257        self.scalar_functions
258            .read()
259            .unwrap()
260            .get(function_name)
261            .cloned()
262    }
263
264    /// Retrieve scalar function names.
265    pub fn scalar_names(&self) -> Vec<String> {
266        self.scalar_functions
267            .read()
268            .unwrap()
269            .keys()
270            .cloned()
271            .collect()
272    }
273
274    /// Retrieve the aggregate function by name
275    pub fn aggr_function(&self, function_name: &str) -> Option<AggregateUDF> {
276        self.aggr_functions
277            .read()
278            .unwrap()
279            .get(function_name)
280            .cloned()
281    }
282
283    /// Retrieve aggregate function names.
284    pub fn aggr_names(&self) -> Vec<String> {
285        self.aggr_functions
286            .read()
287            .unwrap()
288            .keys()
289            .cloned()
290            .collect()
291    }
292
293    /// Retrieve table function by name
294    pub fn table_function(&self, function_name: &str) -> Option<Arc<TableFunction>> {
295        self.table_functions
296            .read()
297            .unwrap()
298            .get(function_name)
299            .cloned()
300    }
301
302    /// Retrieve table function names.
303    pub fn table_function_names(&self) -> Vec<String> {
304        self.table_functions
305            .read()
306            .unwrap()
307            .keys()
308            .cloned()
309            .collect()
310    }
311
312    /// Register an scalar function.
313    /// Will override if the function with same name is already registered.
314    pub fn register_scalar_function(&self, func: ScalarFunctionFactory) {
315        let name = func.name().to_string();
316        let x = self
317            .scalar_functions
318            .write()
319            .unwrap()
320            .insert(name.clone(), func);
321
322        if x.is_some() {
323            warn!("Already registered scalar function '{name}'");
324        }
325    }
326
327    /// Register an aggregate function.
328    ///
329    /// # Panics
330    /// Will panic if the function with same name is already registered.
331    ///
332    /// Panicking consideration: currently the aggregated functions are all statically registered,
333    /// user cannot define their own aggregate functions on the fly. So we can panic here. If that
334    /// invariant is broken in the future, we should return an error instead of panicking.
335    pub fn register_aggr_function(&self, func: AggregateUDF) {
336        let name = func.name().to_string();
337        let x = self
338            .aggr_functions
339            .write()
340            .unwrap()
341            .insert(name.clone(), func);
342        assert!(
343            x.is_none(),
344            "Already registered aggregate function '{name}'"
345        );
346    }
347
348    pub fn register_table_function(&self, func: Arc<TableFunction>) {
349        let name = func.name();
350        let x = self
351            .table_functions
352            .write()
353            .unwrap()
354            .insert(name.to_string(), func.clone());
355
356        if x.is_some() {
357            warn!("Already registered table function '{name}");
358        }
359    }
360
361    pub fn catalog_manager(&self) -> &CatalogManagerRef {
362        &self.catalog_manager
363    }
364
365    pub fn function_state(&self) -> Arc<FunctionState> {
366        self.function_state.clone()
367    }
368
369    /// Returns the [`TableMutationHandlerRef`] in state.
370    pub fn table_mutation_handler(&self) -> Option<&TableMutationHandlerRef> {
371        self.function_state.table_mutation_handler.as_ref()
372    }
373
374    /// Returns the [`ProcedureServiceHandlerRef`] in state.
375    pub fn procedure_service_handler(&self) -> Option<&ProcedureServiceHandlerRef> {
376        self.function_state.procedure_service_handler.as_ref()
377    }
378
379    pub(crate) fn disallow_cross_catalog_query(&self) -> bool {
380        self.plugins
381            .map::<QueryOptions, _, _>(|x| x.disallow_cross_catalog_query)
382            .unwrap_or(false)
383    }
384
385    pub fn session_state(&self) -> SessionState {
386        self.df_context.state()
387    }
388
389    /// Create a DataFrame for a table
390    pub fn read_table(&self, table: TableRef) -> DfResult<DataFrame> {
391        self.df_context
392            .read_table(Arc::new(DfTableProviderAdapter::new(table)))
393    }
394}
395
396struct DfQueryPlanner {
397    physical_planner: DefaultPhysicalPlanner,
398}
399
400impl fmt::Debug for DfQueryPlanner {
401    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
402        f.debug_struct("DfQueryPlanner").finish()
403    }
404}
405
406#[async_trait]
407impl QueryPlanner for DfQueryPlanner {
408    async fn create_physical_plan(
409        &self,
410        logical_plan: &DfLogicalPlan,
411        session_state: &SessionState,
412    ) -> DfResult<Arc<dyn ExecutionPlan>> {
413        self.physical_planner
414            .create_physical_plan(logical_plan, session_state)
415            .await
416    }
417}
418
419/// MySQL-compatible scalar function aliases: (target_name, alias)
420const SCALAR_FUNCTION_ALIASES: &[(&str, &str)] = &[
421    ("upper", "ucase"),
422    ("lower", "lcase"),
423    ("ceil", "ceiling"),
424    ("substr", "mid"),
425    ("random", "rand"),
426];
427
428/// MySQL-compatible aggregate function aliases: (target_name, alias)
429const AGGREGATE_FUNCTION_ALIASES: &[(&str, &str)] =
430    &[("stddev_pop", "std"), ("var_pop", "variance")];
431
432/// Register function aliases.
433///
434/// This function adds aliases like `ucase` -> `upper`, `lcase` -> `lower`, etc.
435/// to make GreptimeDB more compatible with MySQL syntax.
436fn register_function_aliases(ctx: &SessionContext) {
437    let state = ctx.state();
438
439    for (target, alias) in SCALAR_FUNCTION_ALIASES {
440        if let Some(func) = state.scalar_functions().get(*target) {
441            let aliased = func.as_ref().clone().with_aliases([*alias]);
442            ctx.register_udf(aliased);
443        }
444    }
445
446    for (target, alias) in AGGREGATE_FUNCTION_ALIASES {
447        if let Some(func) = state.aggregate_functions().get(*target) {
448            let aliased = func.as_ref().clone().with_aliases([*alias]);
449            ctx.register_udaf(aliased);
450        }
451    }
452}
453
454impl DfQueryPlanner {
455    fn new(
456        catalog_manager: CatalogManagerRef,
457        partition_rule_manager: Option<PartitionRuleManagerRef>,
458        region_query_handler: Option<RegionQueryHandlerRef>,
459    ) -> Self {
460        let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> =
461            vec![Arc::new(PromExtensionPlanner), Arc::new(RangeSelectPlanner)];
462        if let (Some(region_query_handler), Some(partition_rule_manager)) =
463            (region_query_handler, partition_rule_manager)
464        {
465            planners.push(Arc::new(DistExtensionPlanner::new(
466                catalog_manager,
467                partition_rule_manager,
468                region_query_handler,
469            )));
470            planners.push(Arc::new(MergeSortExtensionPlanner {}));
471        }
472        Self {
473            physical_planner: DefaultPhysicalPlanner::with_extension_planners(planners),
474        }
475    }
476}
477
478/// A wrapper around TrackConsumersPool that records metrics.
479///
480/// This wrapper intercepts all memory pool operations and updates
481/// Prometheus metrics for monitoring query memory usage and rejections.
482#[derive(Debug)]
483struct MetricsMemoryPool {
484    inner: Arc<TrackConsumersPool<GreedyMemoryPool>>,
485}
486
487impl MetricsMemoryPool {
488    // Number of top memory consumers to report in OOM error messages
489    const TOP_CONSUMERS_TO_REPORT: usize = 5;
490
491    fn new(limit: usize) -> Self {
492        Self {
493            inner: Arc::new(TrackConsumersPool::new(
494                GreedyMemoryPool::new(limit),
495                NonZeroUsize::new(Self::TOP_CONSUMERS_TO_REPORT).unwrap(),
496            )),
497        }
498    }
499
500    #[inline]
501    fn update_metrics(&self) {
502        QUERY_MEMORY_POOL_USAGE_BYTES.set(self.inner.reserved() as i64);
503    }
504}
505
506impl MemoryPool for MetricsMemoryPool {
507    fn register(&self, consumer: &MemoryConsumer) {
508        self.inner.register(consumer);
509    }
510
511    fn unregister(&self, consumer: &MemoryConsumer) {
512        self.inner.unregister(consumer);
513    }
514
515    fn grow(&self, reservation: &MemoryReservation, additional: usize) {
516        self.inner.grow(reservation, additional);
517        self.update_metrics();
518    }
519
520    fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
521        self.inner.shrink(reservation, shrink);
522        self.update_metrics();
523    }
524
525    fn try_grow(
526        &self,
527        reservation: &MemoryReservation,
528        additional: usize,
529    ) -> datafusion_common::Result<()> {
530        let result = self.inner.try_grow(reservation, additional);
531        if result.is_err() {
532            QUERY_MEMORY_POOL_REJECTED_TOTAL.inc();
533        }
534        self.update_metrics();
535        result
536    }
537
538    fn reserved(&self) -> usize {
539        self.inner.reserved()
540    }
541
542    fn memory_limit(&self) -> MemoryLimit {
543        self.inner.memory_limit()
544    }
545}