Skip to main content

query/query_engine/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::sync::{Arc, RwLock};
19
20use async_trait::async_trait;
21use catalog::CatalogManagerRef;
22use common_base::Plugins;
23use common_function::aggrs::aggr_wrapper::fix_order::FixStateUdafOrderingAnalyzer;
24use common_function::function_factory::ScalarFunctionFactory;
25use common_function::function_registry::FUNCTION_REGISTRY;
26use common_function::handlers::{
27    FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
28};
29use common_function::state::FunctionState;
30use common_stat::get_total_memory_bytes;
31use common_telemetry::warn;
32use datafusion::catalog::TableFunction;
33use datafusion::dataframe::DataFrame;
34use datafusion::error::Result as DfResult;
35use datafusion::execution::SessionStateBuilder;
36use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState};
37use datafusion::execution::memory_pool::{
38    GreedyMemoryPool, MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
39    TrackConsumersPool,
40};
41use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
42use datafusion::physical_optimizer::PhysicalOptimizerRule;
43use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
44use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
45use datafusion::physical_plan::ExecutionPlan;
46use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
47use datafusion_expr::{AggregateUDF, LogicalPlan as DfLogicalPlan, WindowUDF};
48use datafusion_optimizer::Analyzer;
49use datafusion_optimizer::analyzer::function_rewrite::ApplyFunctionRewrites;
50use datafusion_optimizer::optimizer::Optimizer;
51use partition::manager::PartitionRuleManagerRef;
52use promql::extension_plan::PromExtensionPlanner;
53use session::context::QueryContextRef;
54use table::TableRef;
55use table::table::adapter::DfTableProviderAdapter;
56
57use crate::QueryEngineContext;
58use crate::dist_plan::{
59    DistExtensionPlanner, DistPlannerAnalyzer, DistPlannerOptions, DynFilterRegistryManager,
60    MergeSortExtensionPlanner, RemoteDynFilterReceiverExtensionPlanner,
61    RemoteDynFilterRegistryLease,
62};
63use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES};
64use crate::optimizer::ExtensionAnalyzerRule;
65use crate::optimizer::const_normalization::ConstNormalizationRule;
66use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
67use crate::optimizer::count_nest_aggr::CountNestAggrRule;
68use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
69use crate::optimizer::json_type_concretize::JsonTypeConcretizeRule;
70use crate::optimizer::parallelize_scan::ParallelizeScan;
71use crate::optimizer::pass_distribution::PassDistribution;
72use crate::optimizer::promql_tsid_narrow_join::PromqlTsidNarrowJoin;
73use crate::optimizer::remove_duplicate::RemoveDuplicate;
74use crate::optimizer::scan_hint::ScanHintRule;
75use crate::optimizer::string_normalization::StringNormalizationRule;
76use crate::optimizer::transcribe_atat::TranscribeAtatRule;
77use crate::optimizer::type_conversion::TypeConversionRule;
78use crate::optimizer::windowed_sort::WindowedSortPhysicalRule;
79use crate::options::QueryOptions as QueryOptionsNew;
80use crate::query_engine::DefaultSerializer;
81use crate::query_engine::options::QueryOptions;
82use crate::range_select::planner::RangeSelectPlanner;
83use crate::region_query::RegionQueryHandlerRef;
84
85/// Query engine global state
86#[derive(Clone)]
87pub struct QueryEngineState {
88    df_context: SessionContext,
89    catalog_manager: CatalogManagerRef,
90    dyn_filter_registry_manager: Arc<DynFilterRegistryManager>,
91    function_state: Arc<FunctionState>,
92    scalar_functions: Arc<RwLock<HashMap<String, ScalarFunctionFactory>>>,
93    aggr_functions: Arc<RwLock<HashMap<String, AggregateUDF>>>,
94    table_functions: Arc<RwLock<HashMap<String, Arc<TableFunction>>>>,
95    extension_rules: Vec<Arc<dyn ExtensionAnalyzerRule + Send + Sync>>,
96    plugins: Plugins,
97}
98
99impl fmt::Debug for QueryEngineState {
100    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
101        f.debug_struct("QueryEngineState")
102            .field("state", &self.df_context.state())
103            .finish()
104    }
105}
106
107impl QueryEngineState {
108    #[allow(clippy::too_many_arguments)]
109    pub fn new(
110        catalog_list: CatalogManagerRef,
111        partition_rule_manager: Option<PartitionRuleManagerRef>,
112        region_query_handler: Option<RegionQueryHandlerRef>,
113        table_mutation_handler: Option<TableMutationHandlerRef>,
114        procedure_service_handler: Option<ProcedureServiceHandlerRef>,
115        flow_service_handler: Option<FlowServiceHandlerRef>,
116        with_dist_planner: bool,
117        plugins: Plugins,
118        options: QueryOptionsNew,
119    ) -> Self {
120        let total_memory = get_total_memory_bytes().max(0) as u64;
121        let memory_pool_size = options.memory_pool_size.resolve(total_memory) as usize;
122        let runtime_env = if memory_pool_size > 0 {
123            Arc::new(
124                RuntimeEnvBuilder::new()
125                    .with_memory_pool(Arc::new(MetricsMemoryPool::new(memory_pool_size)))
126                    .build()
127                    .expect("Failed to build RuntimeEnv"),
128            )
129        } else {
130            Arc::new(RuntimeEnv::default())
131        };
132        let mut session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
133        if options.parallelism > 0 {
134            session_config = session_config.with_target_partitions(options.parallelism);
135        }
136        if options.allow_query_fallback {
137            session_config
138                .options_mut()
139                .extensions
140                .insert(DistPlannerOptions {
141                    allow_query_fallback: true,
142                });
143        }
144
145        // todo(hl): This serves as a workaround for https://github.com/GreptimeTeam/greptimedb/issues/5659
146        // and we can add that check back once we upgrade datafusion.
147        session_config
148            .options_mut()
149            .execution
150            .skip_physical_aggregate_schema_check = true;
151
152        // Apply extension rules
153        let mut extension_rules = Vec::new();
154
155        // The [`TypeConversionRule`] must be at first
156        extension_rules.insert(0, Arc::new(TypeConversionRule) as _);
157        extension_rules.push(Arc::new(CountNestAggrRule) as _);
158
159        // Apply the datafusion rules
160        let mut analyzer = Analyzer::new();
161        analyzer.rules.insert(0, Arc::new(TranscribeAtatRule));
162        analyzer.rules.insert(0, Arc::new(StringNormalizationRule));
163        analyzer
164            .rules
165            .insert(0, Arc::new(CountWildcardToTimeIndexRule));
166        analyzer.rules.push(Arc::new(ConstNormalizationRule));
167
168        // Add ApplyFunctionRewrites rule,
169        // Note we cannot use `analyzer.add_function_rewrite`
170        // because only rules are copied into session_state
171        analyzer.rules.insert(
172            0,
173            Arc::new(ApplyFunctionRewrites::new(
174                FUNCTION_REGISTRY.function_rewrites(),
175            )),
176        );
177
178        if with_dist_planner {
179            analyzer.rules.push(Arc::new(DistPlannerAnalyzer));
180        }
181        analyzer.rules.push(Arc::new(FixStateUdafOrderingAnalyzer));
182
183        let mut optimizer = Optimizer::new();
184        optimizer.rules.push(Arc::new(ScanHintRule));
185        optimizer.rules.push(Arc::new(JsonTypeConcretizeRule));
186
187        // add physical optimizer
188        let mut physical_optimizer = PhysicalOptimizer::new();
189        // Change TableScan's partition right before enforcing distribution
190        physical_optimizer
191            .rules
192            .insert(5, Arc::new(ParallelizeScan));
193        // Pass distribution requirement to MergeScanExec to avoid unnecessary shuffling
194        physical_optimizer
195            .rules
196            .insert(6, Arc::new(PassDistribution));
197        // Prefer collecting narrow PromQL build sides over repartitioning wide label streams.
198        physical_optimizer
199            .rules
200            .insert(7, Arc::new(PromqlTsidNarrowJoin));
201        // Enforce sorting AFTER custom rules that modify the plan structure
202        physical_optimizer.rules.insert(
203            8,
204            Arc::new(datafusion::physical_optimizer::enforce_sorting::EnforceSorting {}),
205        );
206        // Add rule for windowed sort
207        physical_optimizer
208            .rules
209            .push(Arc::new(WindowedSortPhysicalRule));
210        // explicitly not do filter pushdown for windowed sort&part sort
211        // (notice that `PartSortExec` create another new dyn filter that need to be pushdown if want to use dyn filter optimization)
212        // benchmark shows it can cause performance regression due to useless filtering and extra shuffle.
213        // We can add a rule to do filter pushdown for windowed sort in the future if we find a way to avoid the performance regression.
214        physical_optimizer
215            .rules
216            .push(Arc::new(MatchesConstantTermOptimizer));
217        // Add rule to remove duplicate nodes generated by other rules. Run this in the last.
218        physical_optimizer.rules.push(Arc::new(RemoveDuplicate));
219        // Place SanityCheckPlan at the end of the list to ensure that it runs after all other rules.
220        Self::remove_physical_optimizer_rule(
221            &mut physical_optimizer.rules,
222            SanityCheckPlan {}.name(),
223        );
224        physical_optimizer.rules.push(Arc::new(SanityCheckPlan {}));
225
226        let session_state = SessionStateBuilder::new()
227            .with_config(session_config)
228            .with_runtime_env(runtime_env)
229            .with_default_features()
230            .with_analyzer_rules(analyzer.rules)
231            .with_serializer_registry(Arc::new(DefaultSerializer))
232            .with_query_planner(Arc::new(DfQueryPlanner::new(
233                catalog_list.clone(),
234                partition_rule_manager,
235                region_query_handler.clone(),
236            )))
237            .with_optimizer_rules(optimizer.rules)
238            .with_physical_optimizer_rules(physical_optimizer.rules)
239            .build();
240
241        let df_context = SessionContext::new_with_state(session_state);
242        register_function_aliases(&df_context);
243
244        Self {
245            df_context,
246            catalog_manager: catalog_list,
247            dyn_filter_registry_manager: Arc::new(DynFilterRegistryManager::default()),
248            function_state: Arc::new(FunctionState {
249                table_mutation_handler,
250                procedure_service_handler,
251                flow_service_handler,
252            }),
253            aggr_functions: Arc::new(RwLock::new(HashMap::new())),
254            table_functions: Arc::new(RwLock::new(HashMap::new())),
255            extension_rules,
256            plugins,
257            scalar_functions: Arc::new(RwLock::new(HashMap::new())),
258        }
259    }
260
261    fn remove_physical_optimizer_rule(
262        rules: &mut Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
263        name: &str,
264    ) {
265        rules.retain(|rule| rule.name() != name);
266    }
267
268    /// Optimize the logical plan by the extension analyzer rules.
269    pub fn optimize_by_extension_rules(
270        &self,
271        plan: DfLogicalPlan,
272        context: &QueryEngineContext,
273    ) -> DfResult<DfLogicalPlan> {
274        self.extension_rules
275            .iter()
276            .try_fold(plan, |acc_plan, rule| {
277                rule.analyze(acc_plan, context, self.session_state().config_options())
278            })
279    }
280
281    /// Run the full logical plan optimize phase for the given plan.
282    pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult<DfLogicalPlan> {
283        self.session_state().optimize(&plan)
284    }
285
286    /// Retrieve the scalar function by name
287    pub fn scalar_function(&self, function_name: &str) -> Option<ScalarFunctionFactory> {
288        self.scalar_functions
289            .read()
290            .unwrap()
291            .get(function_name)
292            .cloned()
293    }
294
295    /// Retrieve scalar function names.
296    pub fn scalar_names(&self) -> Vec<String> {
297        self.scalar_functions
298            .read()
299            .unwrap()
300            .keys()
301            .cloned()
302            .collect()
303    }
304
305    /// Retrieve the aggregate function by name
306    pub fn aggr_function(&self, function_name: &str) -> Option<AggregateUDF> {
307        self.aggr_functions
308            .read()
309            .unwrap()
310            .get(function_name)
311            .cloned()
312    }
313
314    /// Retrieve aggregate function names.
315    pub fn aggr_names(&self) -> Vec<String> {
316        self.aggr_functions
317            .read()
318            .unwrap()
319            .keys()
320            .cloned()
321            .collect()
322    }
323
324    /// Retrieve table function by name
325    pub fn table_function(&self, function_name: &str) -> Option<Arc<TableFunction>> {
326        self.table_functions
327            .read()
328            .unwrap()
329            .get(function_name)
330            .cloned()
331    }
332
333    /// Retrieve table function names.
334    pub fn table_function_names(&self) -> Vec<String> {
335        self.table_functions
336            .read()
337            .unwrap()
338            .keys()
339            .cloned()
340            .collect()
341    }
342
343    /// Register an scalar function.
344    /// Will override if the function with same name is already registered.
345    pub fn register_scalar_function(&self, func: ScalarFunctionFactory) {
346        let name = func.name().to_string();
347        let x = self
348            .scalar_functions
349            .write()
350            .unwrap()
351            .insert(name.clone(), func);
352
353        if x.is_some() {
354            warn!("Already registered scalar function '{name}'");
355        }
356    }
357
358    /// Register an aggregate function.
359    ///
360    /// # Panics
361    /// Will panic if the function with same name is already registered.
362    ///
363    /// Panicking consideration: currently the aggregated functions are all statically registered,
364    /// user cannot define their own aggregate functions on the fly. So we can panic here. If that
365    /// invariant is broken in the future, we should return an error instead of panicking.
366    pub fn register_aggr_function(&self, func: AggregateUDF) {
367        let name = func.name().to_string();
368        let x = self
369            .aggr_functions
370            .write()
371            .unwrap()
372            .insert(name.clone(), func);
373        assert!(
374            x.is_none(),
375            "Already registered aggregate function '{name}'"
376        );
377    }
378
379    pub fn register_table_function(&self, func: Arc<TableFunction>) {
380        let name = func.name();
381        let x = self
382            .table_functions
383            .write()
384            .unwrap()
385            .insert(name.to_string(), func.clone());
386
387        if x.is_some() {
388            warn!("Already registered table function '{name}'");
389        }
390    }
391
392    /// Register a window function (UDWF) directly on the DataFusion SessionContext.
393    ///
394    /// This makes the function visible via `session_state.window_functions()`,
395    /// which is used by `DfContextProviderAdapter::get_window_meta`.
396    pub fn register_window_function(&self, func: WindowUDF) {
397        self.df_context.register_udwf(func);
398    }
399
400    pub fn catalog_manager(&self) -> &CatalogManagerRef {
401        &self.catalog_manager
402    }
403
404    pub fn dyn_filter_registry_manager(&self) -> Arc<DynFilterRegistryManager> {
405        self.dyn_filter_registry_manager.clone()
406    }
407
408    pub fn acquire_remote_dyn_filter_registry_lease(
409        &self,
410        query_ctx: &QueryContextRef,
411    ) -> Option<RemoteDynFilterRegistryLease> {
412        let query_id = query_ctx.remote_query_id_value()?;
413        Some(
414            self.dyn_filter_registry_manager
415                .clone()
416                .acquire_lease(query_id),
417        )
418    }
419
420    pub fn function_state(&self) -> Arc<FunctionState> {
421        self.function_state.clone()
422    }
423
424    /// Returns the [`TableMutationHandlerRef`] in state.
425    pub fn table_mutation_handler(&self) -> Option<&TableMutationHandlerRef> {
426        self.function_state.table_mutation_handler.as_ref()
427    }
428
429    /// Returns the [`ProcedureServiceHandlerRef`] in state.
430    pub fn procedure_service_handler(&self) -> Option<&ProcedureServiceHandlerRef> {
431        self.function_state.procedure_service_handler.as_ref()
432    }
433
434    pub(crate) fn disallow_cross_catalog_query(&self) -> bool {
435        self.plugins
436            .map::<QueryOptions, _, _>(|x| x.disallow_cross_catalog_query)
437            .unwrap_or(false)
438    }
439
440    pub fn session_state(&self) -> SessionState {
441        self.df_context.state()
442    }
443
444    /// Create a DataFrame for a table
445    pub fn read_table(&self, table: TableRef) -> DfResult<DataFrame> {
446        self.df_context
447            .read_table(Arc::new(DfTableProviderAdapter::new(table)))
448    }
449}
450
451struct DfQueryPlanner {
452    physical_planner: DefaultPhysicalPlanner,
453}
454
455impl fmt::Debug for DfQueryPlanner {
456    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
457        f.debug_struct("DfQueryPlanner").finish()
458    }
459}
460
461#[async_trait]
462impl QueryPlanner for DfQueryPlanner {
463    async fn create_physical_plan(
464        &self,
465        logical_plan: &DfLogicalPlan,
466        session_state: &SessionState,
467    ) -> DfResult<Arc<dyn ExecutionPlan>> {
468        self.physical_planner
469            .create_physical_plan(logical_plan, session_state)
470            .await
471    }
472}
473
474/// MySQL-compatible scalar function aliases: (target_name, alias)
475const SCALAR_FUNCTION_ALIASES: &[(&str, &str)] = &[
476    ("upper", "ucase"),
477    ("lower", "lcase"),
478    ("ceil", "ceiling"),
479    ("substr", "mid"),
480    ("random", "rand"),
481];
482
483/// MySQL-compatible aggregate function aliases: (target_name, alias)
484const AGGREGATE_FUNCTION_ALIASES: &[(&str, &str)] =
485    &[("stddev_pop", "std"), ("var_pop", "variance")];
486
487/// Register function aliases.
488///
489/// This function adds aliases like `ucase` -> `upper`, `lcase` -> `lower`, etc.
490/// to make GreptimeDB more compatible with MySQL syntax.
491fn register_function_aliases(ctx: &SessionContext) {
492    let state = ctx.state();
493
494    for (target, alias) in SCALAR_FUNCTION_ALIASES {
495        if let Some(func) = state.scalar_functions().get(*target) {
496            let aliased = func.as_ref().clone().with_aliases([*alias]);
497            ctx.register_udf(aliased);
498        }
499    }
500
501    for (target, alias) in AGGREGATE_FUNCTION_ALIASES {
502        if let Some(func) = state.aggregate_functions().get(*target) {
503            let aliased = func.as_ref().clone().with_aliases([*alias]);
504            ctx.register_udaf(aliased);
505        }
506    }
507}
508
509impl DfQueryPlanner {
510    fn new(
511        catalog_manager: CatalogManagerRef,
512        partition_rule_manager: Option<PartitionRuleManagerRef>,
513        region_query_handler: Option<RegionQueryHandlerRef>,
514    ) -> Self {
515        let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> = vec![
516            Arc::new(PromExtensionPlanner),
517            Arc::new(RangeSelectPlanner),
518            Arc::new(RemoteDynFilterReceiverExtensionPlanner),
519        ];
520        if let (Some(region_query_handler), Some(partition_rule_manager)) =
521            (region_query_handler, partition_rule_manager)
522        {
523            planners.push(Arc::new(DistExtensionPlanner::new(
524                catalog_manager,
525                partition_rule_manager,
526                region_query_handler,
527            )));
528            planners.push(Arc::new(MergeSortExtensionPlanner {}));
529        }
530        Self {
531            physical_planner: DefaultPhysicalPlanner::with_extension_planners(planners),
532        }
533    }
534}
535
536/// A wrapper around TrackConsumersPool that records metrics.
537///
538/// This wrapper intercepts all memory pool operations and updates
539/// Prometheus metrics for monitoring query memory usage and rejections.
540#[derive(Debug)]
541struct MetricsMemoryPool {
542    inner: Arc<TrackConsumersPool<GreedyMemoryPool>>,
543}
544
545impl MetricsMemoryPool {
546    // Number of top memory consumers to report in OOM error messages
547    const TOP_CONSUMERS_TO_REPORT: usize = 5;
548
549    fn new(limit: usize) -> Self {
550        Self {
551            inner: Arc::new(TrackConsumersPool::new(
552                GreedyMemoryPool::new(limit),
553                NonZeroUsize::new(Self::TOP_CONSUMERS_TO_REPORT).unwrap(),
554            )),
555        }
556    }
557
558    #[inline]
559    fn update_metrics(&self) {
560        QUERY_MEMORY_POOL_USAGE_BYTES.set(self.inner.reserved() as i64);
561    }
562}
563
564impl MemoryPool for MetricsMemoryPool {
565    fn register(&self, consumer: &MemoryConsumer) {
566        self.inner.register(consumer);
567    }
568
569    fn unregister(&self, consumer: &MemoryConsumer) {
570        self.inner.unregister(consumer);
571    }
572
573    fn grow(&self, reservation: &MemoryReservation, additional: usize) {
574        self.inner.grow(reservation, additional);
575        self.update_metrics();
576    }
577
578    fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
579        self.inner.shrink(reservation, shrink);
580        self.update_metrics();
581    }
582
583    fn try_grow(
584        &self,
585        reservation: &MemoryReservation,
586        additional: usize,
587    ) -> datafusion_common::Result<()> {
588        let result = self.inner.try_grow(reservation, additional);
589        if result.is_err() {
590            QUERY_MEMORY_POOL_REJECTED_TOTAL.inc();
591        }
592        self.update_metrics();
593        result
594    }
595
596    fn reserved(&self) -> usize {
597        self.inner.reserved()
598    }
599
600    fn memory_limit(&self) -> MemoryLimit {
601        self.inner.memory_limit()
602    }
603}
604
605#[cfg(test)]
606mod tests {
607    use common_base::Plugins;
608    use session::context::QueryContext;
609
610    use super::*;
611    use crate::options::QueryOptions;
612
613    fn new_query_engine_state() -> QueryEngineState {
614        QueryEngineState::new(
615            catalog::memory::new_memory_catalog_manager().unwrap(),
616            None,
617            None,
618            None,
619            None,
620            None,
621            false,
622            Plugins::default(),
623            QueryOptions::default(),
624        )
625    }
626
627    #[test]
628    fn query_engine_state_reuses_query_scoped_dyn_filter_registry_lease() {
629        let state = new_query_engine_state();
630        let query_ctx = QueryContext::arc();
631
632        let first = state
633            .acquire_remote_dyn_filter_registry_lease(&query_ctx)
634            .unwrap();
635        let second = state
636            .acquire_remote_dyn_filter_registry_lease(&query_ctx)
637            .unwrap();
638
639        assert!(first.ptr_eq(&second));
640        assert_eq!(state.dyn_filter_registry_manager().registry_count(), 1);
641        assert_eq!(
642            first.registry().query_id(),
643            query_ctx.remote_query_id_value().unwrap()
644        );
645    }
646
647    #[test]
648    fn query_engine_state_relies_on_query_context_remote_query_id_contract() {
649        let state = new_query_engine_state();
650        let query_ctx = QueryContext::arc();
651
652        assert!(query_ctx.remote_query_id_value().is_some());
653
654        let lease = state
655            .acquire_remote_dyn_filter_registry_lease(&query_ctx)
656            .unwrap();
657
658        assert_eq!(
659            lease.registry().query_id(),
660            query_ctx.remote_query_id_value().unwrap()
661        );
662        assert_eq!(state.dyn_filter_registry_manager().registry_count(), 1);
663    }
664
665    #[test]
666    fn query_engine_state_separates_registries_for_different_query_contexts() {
667        let state = new_query_engine_state();
668        let first_query_ctx = QueryContext::arc();
669        let second_query_ctx = QueryContext::arc();
670
671        let first = state
672            .acquire_remote_dyn_filter_registry_lease(&first_query_ctx)
673            .unwrap();
674        let second = state
675            .acquire_remote_dyn_filter_registry_lease(&second_query_ctx)
676            .unwrap();
677
678        assert!(!first.ptr_eq(&second));
679        assert_eq!(state.dyn_filter_registry_manager().registry_count(), 2);
680        assert_eq!(
681            first.registry().query_id(),
682            first_query_ctx.remote_query_id_value().unwrap()
683        );
684        assert_eq!(
685            second.registry().query_id(),
686            second_query_ctx.remote_query_id_value().unwrap()
687        );
688    }
689}