Skip to main content

query/dist_plan/
dyn_filter_bridge.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::Arc;
17
18use common_query::request::{
19    DynFilterPayload, INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
20    INITIAL_REMOTE_DYN_FILTER_REGS_MAX_TOTAL_PROTO_BYTES, InitialDynFilterReg,
21    InitialDynFilterRegs, InitialDynFilterSnapshot,
22};
23use datafusion_common::Result;
24use datafusion_physical_expr::PhysicalExpr;
25use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
26use session::context::{QueryContext, QueryContextRef};
27use store_api::storage::RegionId;
28
29use crate::dist_plan::filter_id::build_remote_dyn_filter_id;
30use crate::dist_plan::{FilterId, QueryDynFilterRegistry, RemoteDynFilterProducerId, Subscriber};
31
32#[derive(Debug, Clone)]
33pub(crate) struct CapturedDynFilter {
34    filter_id: FilterId,
35    initial_registration: InitialDynFilterReg,
36    pub(crate) alive_dyn_filter: Arc<DynamicFilterPhysicalExpr>,
37}
38
39#[derive(Debug, Clone)]
40pub(crate) struct RemoteDynFilterPushdown {
41    pub(crate) captured_dyn_filters: Vec<CapturedDynFilter>,
42    /// Preflight result per parent filter.
43    pub(crate) pushed_down: Vec<bool>,
44}
45
46pub(crate) fn capture_remote_dyn_filters_for_pushdown(
47    remote_dyn_filter_producer_id: RemoteDynFilterProducerId,
48    parent_filters: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
49) -> RemoteDynFilterPushdown {
50    let mut pushed_down = Vec::with_capacity(parent_filters.len());
51    let mut captured_dyn_filters = Vec::new();
52
53    for (producer_local_ordinal, filter) in parent_filters.into_iter().enumerate() {
54        let Some(alive_dyn_filter) = downcast_dynamic_filter(filter) else {
55            pushed_down.push(false);
56            continue;
57        };
58
59        match build_captured_dyn_filter(
60            remote_dyn_filter_producer_id,
61            producer_local_ordinal,
62            alive_dyn_filter,
63        ) {
64            Ok(captured_dyn_filter) => {
65                pushed_down.push(true);
66                captured_dyn_filters.push(captured_dyn_filter);
67            }
68            Err(error) => {
69                common_telemetry::warn!(error; "Remote dyn filter is not pushed down because initial registration cannot be built");
70                pushed_down.push(false);
71            }
72        }
73    }
74
75    if let Err(error) = validate_initial_registrations_for_pushdown(&captured_dyn_filters) {
76        common_telemetry::warn!(error; "Remote dyn filters are not pushed down because initial registrations are invalid");
77        return RemoteDynFilterPushdown {
78            captured_dyn_filters: Vec::new(),
79            pushed_down: vec![false; pushed_down.len()],
80        };
81    }
82
83    RemoteDynFilterPushdown {
84        captured_dyn_filters,
85        pushed_down,
86    }
87}
88
89fn downcast_dynamic_filter(
90    expr: Arc<dyn datafusion::physical_plan::PhysicalExpr>,
91) -> Option<Arc<DynamicFilterPhysicalExpr>> {
92    (expr as Arc<dyn Any + Send + Sync + 'static>)
93        .downcast::<DynamicFilterPhysicalExpr>()
94        .ok()
95}
96
97pub(crate) fn register_dyn_filters_for_region(
98    registry: &QueryDynFilterRegistry,
99    region_id: RegionId,
100    captured_dyn_filters: &[CapturedDynFilter],
101) {
102    for captured_dyn_filter in captured_dyn_filters {
103        let _ = registry.register_remote_dyn_filter(
104            captured_dyn_filter.filter_id.clone(),
105            captured_dyn_filter.alive_dyn_filter.clone(),
106        );
107        let _ = registry
108            .register_subscriber(&captured_dyn_filter.filter_id, Subscriber::new(region_id));
109    }
110}
111
112fn build_captured_dyn_filter(
113    remote_dyn_filter_producer_id: RemoteDynFilterProducerId,
114    producer_local_ordinal: usize,
115    alive_dyn_filter: Arc<DynamicFilterPhysicalExpr>,
116) -> Result<CapturedDynFilter> {
117    let children = alive_dyn_filter
118        .children()
119        .into_iter()
120        .cloned()
121        .collect::<Vec<_>>();
122    let filter_id = build_remote_dyn_filter_id(
123        remote_dyn_filter_producer_id,
124        producer_local_ordinal,
125        &children,
126    )?;
127    let initial_registration =
128        InitialDynFilterReg::from_filter_id_and_children(filter_id.to_string(), &children)?;
129
130    Ok(CapturedDynFilter {
131        filter_id,
132        initial_registration: attach_initial_snapshot(initial_registration, &alive_dyn_filter),
133        alive_dyn_filter,
134    })
135}
136
137fn validate_initial_registrations_for_pushdown(
138    captured_dyn_filters: &[CapturedDynFilter],
139) -> std::result::Result<(), String> {
140    let regs = build_initial_dyn_filter_regs_for_region(captured_dyn_filters);
141    regs.validate_default_bounds()?;
142    regs.to_extension_value()
143        .map_err(|error| error.to_string())?;
144    Ok(())
145}
146
147fn attach_initial_snapshot(
148    initial_registration: InitialDynFilterReg,
149    alive_dyn_filter: &DynamicFilterPhysicalExpr,
150) -> InitialDynFilterReg {
151    let Some(initial_snapshot) = initial_snapshot(alive_dyn_filter) else {
152        return initial_registration;
153    };
154
155    initial_registration.with_initial_snapshot(initial_snapshot)
156}
157
158fn initial_snapshot(
159    alive_dyn_filter: &DynamicFilterPhysicalExpr,
160) -> Option<InitialDynFilterSnapshot> {
161    let generation = alive_dyn_filter.snapshot_generation();
162    let current = match alive_dyn_filter.current() {
163        Ok(current) => current,
164        Err(error) => {
165            common_telemetry::warn!(error; "Failed to read remote dyn filter initial snapshot");
166            return None;
167        }
168    };
169
170    let payload = match DynFilterPayload::from_datafusion_expr(
171        &current,
172        INITIAL_REMOTE_DYN_FILTER_REGS_MAX_TOTAL_PROTO_BYTES,
173    ) {
174        Ok(payload) => payload,
175        Err(error) => {
176            common_telemetry::warn!(error; "Failed to encode remote dyn filter initial snapshot");
177            return None;
178        }
179    };
180
181    // Current DataFusion exposes `wait_complete()`, but no non-blocking completion getter.
182    let is_complete = false;
183    Some(InitialDynFilterSnapshot::new(
184        payload,
185        generation,
186        is_complete,
187    ))
188}
189
190fn build_initial_dyn_filter_regs_for_region(
191    captured_dyn_filters: &[CapturedDynFilter],
192) -> InitialDynFilterRegs {
193    InitialDynFilterRegs::new(
194        captured_dyn_filters
195            .iter()
196            .map(|captured| captured.initial_registration.clone())
197            .collect(),
198    )
199}
200
201pub(crate) fn query_context_with_initial_dyn_filter_regs(
202    query_ctx: &QueryContextRef,
203    region_id: RegionId,
204    captured_dyn_filters: &[CapturedDynFilter],
205) -> QueryContext {
206    let mut region_query_ctx = query_ctx.as_ref().clone();
207    let regs = build_initial_dyn_filter_regs_for_region(captured_dyn_filters);
208    if regs.is_empty() {
209        return region_query_ctx;
210    }
211
212    if let Err(error) = regs.validate_default_bounds() {
213        common_telemetry::warn!(error; "Dropping initial remote dyn filter registrations for region {} that exceed configured bounds", region_id);
214        return region_query_ctx;
215    }
216
217    match regs.to_extension_value() {
218        Ok(serialized) => region_query_ctx.set_extension(
219            INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
220            serialized,
221        ),
222        Err(error) => {
223            common_telemetry::warn!(error; "Failed to serialize initial remote dyn filter registrations");
224        }
225    }
226
227    region_query_ctx
228}
229
230#[cfg(test)]
231mod tests {
232    use std::fmt;
233    use std::hash::{Hash, Hasher};
234
235    use datafusion::execution::TaskContext;
236    use datafusion_common::ScalarValue;
237    use datafusion_expr::ColumnarValue;
238    use datafusion_physical_expr::expressions::{Column, lit};
239    use session::query_id::QueryId;
240    use uuid::Uuid;
241
242    use super::*;
243
244    #[derive(Debug)]
245    struct UnserializableExpr;
246
247    impl fmt::Display for UnserializableExpr {
248        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249            write!(f, "unserializable_expr")
250        }
251    }
252
253    impl Hash for UnserializableExpr {
254        fn hash<H: Hasher>(&self, state: &mut H) {
255            "unserializable_expr".hash(state);
256        }
257    }
258
259    impl PartialEq for UnserializableExpr {
260        fn eq(&self, _other: &Self) -> bool {
261            true
262        }
263    }
264
265    impl Eq for UnserializableExpr {}
266
267    impl datafusion_physical_expr::PhysicalExpr for UnserializableExpr {
268        fn as_any(&self) -> &dyn Any {
269            self
270        }
271
272        fn data_type(
273            &self,
274            _input_schema: &arrow_schema::Schema,
275        ) -> datafusion_common::Result<arrow_schema::DataType> {
276            Ok(arrow_schema::DataType::Boolean)
277        }
278
279        fn nullable(
280            &self,
281            _input_schema: &arrow_schema::Schema,
282        ) -> datafusion_common::Result<bool> {
283            Ok(false)
284        }
285
286        fn evaluate(
287            &self,
288            _batch: &common_recordbatch::DfRecordBatch,
289        ) -> datafusion_common::Result<ColumnarValue> {
290            Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(true))))
291        }
292
293        fn children(&self) -> Vec<&Arc<dyn datafusion_physical_expr::PhysicalExpr>> {
294            Vec::new()
295        }
296
297        fn with_new_children(
298            self: Arc<Self>,
299            _children: Vec<Arc<dyn datafusion_physical_expr::PhysicalExpr>>,
300        ) -> datafusion_common::Result<Arc<dyn datafusion_physical_expr::PhysicalExpr>> {
301            Ok(self)
302        }
303
304        fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
305            write!(f, "{self}")
306        }
307    }
308
309    fn test_query_id(value: u128) -> QueryId {
310        QueryId::from(Uuid::from_u128(value))
311    }
312
313    fn test_remote_dyn_filter_producer_id(value: u64) -> RemoteDynFilterProducerId {
314        RemoteDynFilterProducerId::new(value)
315    }
316
317    fn test_captured_dyn_filter(
318        remote_dyn_filter_producer_id: RemoteDynFilterProducerId,
319        producer_local_ordinal: usize,
320        column_name: &str,
321        column_index: usize,
322    ) -> CapturedDynFilter {
323        build_captured_dyn_filter(
324            remote_dyn_filter_producer_id,
325            producer_local_ordinal,
326            Arc::new(DynamicFilterPhysicalExpr::new(
327                vec![Arc::new(Column::new(column_name, column_index)) as Arc<_>],
328                lit(true) as _,
329            )),
330        )
331        .unwrap()
332    }
333
334    fn test_dyn_filter_with_snapshot_payload(
335        column_name: &str,
336        column_index: usize,
337        payload_bytes: usize,
338    ) -> Arc<DynamicFilterPhysicalExpr> {
339        let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(
340            vec![Arc::new(Column::new(column_name, column_index)) as Arc<_>],
341            lit(true) as _,
342        ));
343        dyn_filter
344            .update(lit(ScalarValue::Utf8(Some("x".repeat(payload_bytes)))) as _)
345            .unwrap();
346        dyn_filter
347    }
348
349    #[test]
350    fn capture_remote_dyn_filters_for_pushdown_preserves_parent_filter_ordinals() {
351        let parent_filters = vec![
352            Arc::new(Column::new("service", 0)) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
353            Arc::new(DynamicFilterPhysicalExpr::new(
354                vec![Arc::new(Column::new("host", 1)) as Arc<_>],
355                lit(true) as _,
356            )) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
357            Arc::new(Column::new("zone", 2)) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
358            Arc::new(DynamicFilterPhysicalExpr::new(
359                vec![Arc::new(Column::new("pod", 3)) as Arc<_>],
360                lit(true) as _,
361            )) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
362        ];
363
364        let remote_dyn_filter_producer_id = test_remote_dyn_filter_producer_id(42);
365        let captured =
366            capture_remote_dyn_filters_for_pushdown(remote_dyn_filter_producer_id, parent_filters)
367                .captured_dyn_filters;
368
369        assert_eq!(captured.len(), 2);
370        assert_eq!(
371            captured[0].filter_id.remote_dyn_filter_producer_id(),
372            remote_dyn_filter_producer_id
373        );
374        assert_eq!(
375            captured[1].filter_id.remote_dyn_filter_producer_id(),
376            remote_dyn_filter_producer_id
377        );
378        assert_eq!(captured[0].filter_id.producer_ordinal(), 1);
379        assert_eq!(captured[1].filter_id.producer_ordinal(), 3);
380    }
381
382    #[test]
383    fn capture_remote_dyn_filters_for_pushdown_marks_only_valid_initial_regs() {
384        let parent_filters = vec![
385            Arc::new(Column::new("service", 0)) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
386            Arc::new(DynamicFilterPhysicalExpr::new(
387                vec![Arc::new(Column::new("host", 1)) as Arc<_>],
388                lit(true) as _,
389            )) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
390            Arc::new(Column::new("zone", 2)) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
391        ];
392
393        let remote_dyn_filter_producer_id = test_remote_dyn_filter_producer_id(42);
394        let pushdown =
395            capture_remote_dyn_filters_for_pushdown(remote_dyn_filter_producer_id, parent_filters);
396
397        assert_eq!(pushdown.pushed_down, vec![false, true, false]);
398        assert_eq!(pushdown.captured_dyn_filters.len(), 1);
399        assert_eq!(
400            pushdown.captured_dyn_filters[0]
401                .filter_id
402                .remote_dyn_filter_producer_id(),
403            remote_dyn_filter_producer_id
404        );
405        assert_eq!(
406            pushdown.captured_dyn_filters[0]
407                .filter_id
408                .producer_ordinal(),
409            1
410        );
411        assert!(
412            pushdown.captured_dyn_filters[0]
413                .initial_registration
414                .initial_snapshot
415                .is_some()
416        );
417    }
418
419    #[test]
420    fn capture_remote_dyn_filters_for_pushdown_rejects_unencodable_registration() {
421        let parent_filters = vec![Arc::new(DynamicFilterPhysicalExpr::new(
422            vec![Arc::new(UnserializableExpr) as Arc<_>],
423            lit(true) as _,
424        ))
425            as Arc<dyn datafusion::physical_plan::PhysicalExpr>];
426
427        let pushdown = capture_remote_dyn_filters_for_pushdown(
428            test_remote_dyn_filter_producer_id(42),
429            parent_filters,
430        );
431
432        assert_eq!(pushdown.pushed_down, vec![false]);
433        assert!(pushdown.captured_dyn_filters.is_empty());
434    }
435
436    #[test]
437    fn capture_remote_dyn_filters_for_pushdown_attaches_initial_snapshot() {
438        let parent_filters = vec![Arc::new(DynamicFilterPhysicalExpr::new(
439            vec![Arc::new(Column::new("host", 1)) as Arc<_>],
440            lit(true) as _,
441        ))
442            as Arc<dyn datafusion::physical_plan::PhysicalExpr>];
443
444        let pushdown = capture_remote_dyn_filters_for_pushdown(
445            test_remote_dyn_filter_producer_id(42),
446            parent_filters,
447        );
448
449        assert_eq!(pushdown.pushed_down, vec![true]);
450        assert!(
451            pushdown.captured_dyn_filters[0]
452                .initial_registration
453                .initial_snapshot
454                .is_some()
455        );
456    }
457
458    #[test]
459    fn capture_remote_dyn_filters_for_pushdown_attaches_initial_snapshot_after_update() {
460        let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(
461            vec![Arc::new(Column::new("host", 1)) as Arc<_>],
462            lit(true) as _,
463        ));
464        dyn_filter.update(lit(false) as _).unwrap();
465        let parent_filters = vec![dyn_filter as Arc<dyn datafusion::physical_plan::PhysicalExpr>];
466
467        let pushdown = capture_remote_dyn_filters_for_pushdown(
468            test_remote_dyn_filter_producer_id(42),
469            parent_filters,
470        );
471
472        assert_eq!(pushdown.pushed_down, vec![true]);
473        let snapshot = pushdown.captured_dyn_filters[0]
474            .initial_registration
475            .initial_snapshot
476            .as_ref()
477            .unwrap();
478        assert_eq!(snapshot.generation, 2);
479        assert!(!snapshot.is_complete);
480        assert!(matches!(
481            snapshot.payload,
482            DynFilterPayload::Datafusion(ref bytes) if !bytes.is_empty()
483        ));
484    }
485
486    #[test]
487    fn capture_remote_dyn_filters_for_pushdown_rejects_oversized_snapshots() {
488        let parent_filters = vec![
489            test_dyn_filter_with_snapshot_payload("host", 0, 40 * 1024)
490                as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
491            test_dyn_filter_with_snapshot_payload("pod", 1, 40 * 1024)
492                as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
493        ];
494
495        let pushdown = capture_remote_dyn_filters_for_pushdown(
496            test_remote_dyn_filter_producer_id(42),
497            parent_filters,
498        );
499
500        assert_eq!(pushdown.pushed_down, vec![false, false]);
501        assert!(pushdown.captured_dyn_filters.is_empty());
502    }
503
504    #[test]
505    fn capture_remote_dyn_filters_for_pushdown_rejects_too_many_regs_with_snapshots() {
506        const TOO_MANY_INITIAL_REGS: usize = 65;
507
508        let parent_filters = (0..TOO_MANY_INITIAL_REGS)
509            .map(|ordinal| {
510                test_dyn_filter_with_snapshot_payload(&format!("host_{ordinal}"), ordinal, 1)
511                    as Arc<dyn datafusion::physical_plan::PhysicalExpr>
512            })
513            .collect::<Vec<_>>();
514
515        let pushdown = capture_remote_dyn_filters_for_pushdown(
516            test_remote_dyn_filter_producer_id(42),
517            parent_filters,
518        );
519
520        assert!(pushdown.captured_dyn_filters.is_empty());
521        assert_eq!(pushdown.pushed_down, vec![false; TOO_MANY_INITIAL_REGS]);
522    }
523
524    #[test]
525    fn capture_remote_dyn_filters_for_pushdown_rejects_regs_exceeding_bounds() {
526        const TOO_MANY_INITIAL_REGS: usize = 65;
527
528        let parent_filters = (0..TOO_MANY_INITIAL_REGS)
529            .map(|_| {
530                Arc::new(DynamicFilterPhysicalExpr::new(
531                    vec![Arc::new(Column::new("host", 0)) as Arc<_>],
532                    lit(true) as _,
533                )) as Arc<dyn datafusion::physical_plan::PhysicalExpr>
534            })
535            .collect::<Vec<_>>();
536
537        let pushdown = capture_remote_dyn_filters_for_pushdown(
538            test_remote_dyn_filter_producer_id(42),
539            parent_filters,
540        );
541
542        assert!(pushdown.captured_dyn_filters.is_empty());
543        assert_eq!(pushdown.pushed_down, vec![false; TOO_MANY_INITIAL_REGS]);
544    }
545
546    #[test]
547    fn register_dyn_filters_for_region_reuses_existing_entry() {
548        let registry = QueryDynFilterRegistry::new(test_query_id(1));
549        let captured_dyn_filters = vec![test_captured_dyn_filter(
550            test_remote_dyn_filter_producer_id(42),
551            2,
552            "host",
553            0,
554        )];
555        let first_region_id = RegionId::new(1024, 7);
556        let second_region_id = RegionId::new(1024, 8);
557
558        register_dyn_filters_for_region(&registry, first_region_id, &captured_dyn_filters);
559        register_dyn_filters_for_region(&registry, second_region_id, &captured_dyn_filters);
560
561        assert_eq!(registry.entry_count(), 1);
562        let entry = registry.entries().pop().unwrap();
563        assert_eq!(
564            entry.filter_id().remote_dyn_filter_producer_id(),
565            test_remote_dyn_filter_producer_id(42)
566        );
567        assert_eq!(entry.filter_id().producer_ordinal(), 2);
568        let subscribers = entry.subscribers();
569        assert_eq!(subscribers.len(), 2);
570        assert!(
571            subscribers
572                .iter()
573                .any(|subscriber| subscriber.region_id() == first_region_id)
574        );
575        assert!(
576            subscribers
577                .iter()
578                .any(|subscriber| subscriber.region_id() == second_region_id)
579        );
580    }
581
582    #[test]
583    fn register_dyn_filters_for_region_keeps_independent_producer_ids_distinct() {
584        let registry = QueryDynFilterRegistry::new(test_query_id(1));
585        let region_id = RegionId::new(1024, 7);
586        let make_filter = |remote_dyn_filter_producer_id| {
587            test_captured_dyn_filter(remote_dyn_filter_producer_id, 2, "host", 0)
588        };
589
590        register_dyn_filters_for_region(
591            &registry,
592            region_id,
593            &[make_filter(test_remote_dyn_filter_producer_id(42))],
594        );
595        register_dyn_filters_for_region(
596            &registry,
597            region_id,
598            &[make_filter(test_remote_dyn_filter_producer_id(43))],
599        );
600
601        assert_eq!(registry.entry_count(), 2);
602    }
603
604    #[test]
605    fn query_context_includes_region_initial_dyn_filter_regs() {
606        let captured_dyn_filters = vec![test_captured_dyn_filter(
607            test_remote_dyn_filter_producer_id(42),
608            2,
609            "host",
610            0,
611        )];
612        let region_id = RegionId::new(1024, 7);
613        let query_ctx = QueryContext::arc();
614
615        let region_query_ctx = query_context_with_initial_dyn_filter_regs(
616            &query_ctx,
617            region_id,
618            &captured_dyn_filters,
619        );
620        let extension = region_query_ctx
621            .extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY)
622            .unwrap();
623        let regs = InitialDynFilterRegs::from_extension_value(extension).unwrap();
624        let decoded_children = regs.regs[0]
625            .decode_children(
626                &TaskContext::default(),
627                &arrow_schema::Schema::new(vec![arrow_schema::Field::new(
628                    "host",
629                    arrow_schema::DataType::Utf8,
630                    false,
631                )]),
632                1024,
633            )
634            .unwrap();
635        assert_eq!(regs.regs.len(), 1);
636        assert_eq!(
637            regs.regs[0].filter_id,
638            captured_dyn_filters[0].filter_id.to_string()
639        );
640        assert_eq!(decoded_children.len(), 1);
641        assert!(decoded_children[0].as_any().is::<Column>());
642    }
643
644    #[test]
645    fn query_context_drops_initial_regs_when_duplicate_filter_ids_exceed_bounds() {
646        let captured_dyn_filters = vec![
647            test_captured_dyn_filter(test_remote_dyn_filter_producer_id(42), 2, "host", 0),
648            test_captured_dyn_filter(test_remote_dyn_filter_producer_id(42), 2, "host", 0),
649        ];
650        let region_id = RegionId::new(1024, 7);
651        let query_ctx = QueryContext::arc();
652
653        let region_query_ctx = query_context_with_initial_dyn_filter_regs(
654            &query_ctx,
655            region_id,
656            &captured_dyn_filters,
657        );
658
659        assert!(
660            region_query_ctx
661                .extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY)
662                .is_none()
663        );
664    }
665}