Skip to main content

datanode/region_server/
remote_dyn_filter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::{Arc, Weak};
17use std::task::{Context, Poll};
18
19use api::region::RegionResponse;
20use api::v1::region::RemoteDynFilterRequest;
21use api::v1::region::remote_dyn_filter_request::Action;
22use common_base::Plugins;
23use common_recordbatch::adapter::RecordBatchMetrics;
24use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
25use common_telemetry::{debug, warn};
26use datafusion_expr::LogicalPlan;
27use futures_util::Stream;
28use query::dist_plan::{
29    RemoteDynFilterReceiverInjector, RemoteDynFilterReceiverInjectorRef,
30    RemoteDynFilterReceiverLogicalPlan,
31};
32use query::options::remote_dyn_filter_pushdown_enabled_from_extensions;
33use query::promql::plan_contains_promql_extension;
34use session::context::QueryContextRef;
35use session::query_id::QueryId;
36use snafu::OptionExt;
37use store_api::storage::RegionId;
38
39use crate::error::{self, Result, UnexpectedSnafu};
40use crate::region_server::registrations::{
41    RemoteDynFilterId, RemoteDynFilterUpdateOutcome, apply_remote_dyn_filter_update,
42    initial_dyn_filter_regs_from_query_ctx, register_initial_dyn_filter_regs,
43    remote_dyn_filter_exprs_for_initial_regs, remove_initial_dyn_filter_regs,
44    unregister_remote_dyn_filter,
45};
46use crate::region_server::{RegionServer, RegionServerInner};
47
48fn remote_dyn_filter_receiver_plan(
49    server: &Weak<RegionServerInner>,
50    origin: LogicalPlan,
51    ctx: QueryContextRef,
52) -> LogicalPlan {
53    if !remote_dyn_filter_pushdown_enabled(&ctx) {
54        return origin;
55    }
56
57    if plan_contains_promql_extension(&origin) {
58        return origin;
59    }
60
61    let Some(query_id) = ctx.remote_query_id_value() else {
62        return origin;
63    };
64
65    let Some(initial_regs) = initial_dyn_filter_regs_from_query_ctx(&ctx) else {
66        return origin;
67    };
68
69    let Some(server) = server.upgrade() else {
70        return origin;
71    };
72
73    let dyn_filters = remote_dyn_filter_exprs_for_initial_regs(
74        &server.initial_remote_dyn_filter_registrations,
75        &query_id,
76        &initial_regs,
77        origin.schema().as_arrow(),
78    );
79
80    if dyn_filters.is_empty() {
81        return origin;
82    }
83
84    RemoteDynFilterReceiverLogicalPlan::new(origin, dyn_filters).into_logical_plan()
85}
86
87fn remote_dyn_filter_pushdown_enabled(query_ctx: &QueryContextRef) -> bool {
88    match remote_dyn_filter_pushdown_enabled_from_extensions(&query_ctx.extensions()) {
89        Ok(enabled) => enabled,
90        Err(error) => {
91            warn!(error; "Remote dynamic filter pushdown is disabled because the query option is invalid");
92            false
93        }
94    }
95}
96
97impl RegionServer {
98    pub fn install_remote_dyn_filter_receiver_injector(&self, plugins: &Plugins) {
99        let server = Arc::downgrade(&self.inner);
100        plugins.insert::<RemoteDynFilterReceiverInjectorRef>(Arc::new(
101            RemoteDynFilterReceiverInjector::new(move |origin, ctx| {
102                remote_dyn_filter_receiver_plan(&server, origin, ctx)
103            }),
104        ));
105    }
106
107    pub(super) fn register_initial_remote_dyn_filter_cleanup(
108        &self,
109        query_ctx: &QueryContextRef,
110        region_id: RegionId,
111    ) -> Option<RemoteDynFilterRegistrationGuard> {
112        if !remote_dyn_filter_pushdown_enabled(query_ctx) {
113            return None;
114        }
115
116        let initial_dyn_filter_regs = initial_dyn_filter_regs_from_query_ctx(query_ctx);
117        let query_id = query_ctx.remote_query_id_value();
118        let registered_filter_ids = if let (Some(query_id), Some(regs)) =
119            (query_id.as_ref(), initial_dyn_filter_regs.as_ref())
120        {
121            register_initial_dyn_filter_regs(
122                &self.inner.initial_remote_dyn_filter_registrations,
123                query_id,
124                region_id,
125                regs,
126            )
127        } else {
128            Vec::new()
129        };
130
131        match (query_id, registered_filter_ids.is_empty()) {
132            (Some(query_id), false) => Some(RemoteDynFilterRegistrationGuard::new(
133                self.clone(),
134                query_id,
135                region_id,
136                registered_filter_ids,
137            )),
138            _ => None,
139        }
140    }
141
142    pub(super) async fn handle_remote_dyn_filter_request(
143        &self,
144        request: &RemoteDynFilterRequest,
145    ) -> Result<RegionResponse> {
146        if request.query_id.is_empty() {
147            return error::MissingRequiredFieldSnafu { name: "query_id" }.fail();
148        }
149
150        let query_id = request.query_id.parse::<QueryId>().map_err(|_| {
151            UnexpectedSnafu {
152                violated: "remote dynamic filter query_id must be a valid QueryId",
153            }
154            .build()
155        })?;
156
157        match request
158            .action
159            .as_ref()
160            .context(error::MissingRequiredFieldSnafu { name: "action" })?
161        {
162            Action::Update(update) => {
163                self.handle_remote_dyn_filter_update(&query_id, update)
164                    .await
165            }
166            Action::Unregister(unregister) => {
167                self.handle_remote_dyn_filter_unregister(&query_id, unregister)
168                    .await
169            }
170        }
171    }
172
173    async fn handle_remote_dyn_filter_update(
174        &self,
175        query_id: &QueryId,
176        request: &api::v1::region::RemoteDynFilterUpdate,
177    ) -> Result<RegionResponse> {
178        if request.filter_id.is_empty() {
179            return error::MissingRequiredFieldSnafu { name: "filter_id" }.fail();
180        }
181
182        if request.payload.is_empty() {
183            return error::MissingRequiredFieldSnafu { name: "payload" }.fail();
184        }
185
186        let filter_id = RemoteDynFilterId::new(request.filter_id.clone());
187        let outcome = apply_remote_dyn_filter_update(
188            &self.inner.initial_remote_dyn_filter_registrations,
189            query_id,
190            &filter_id,
191            &request.payload,
192            request.generation,
193            request.is_complete,
194        );
195        self.log_remote_dyn_filter_update_outcome(query_id, &filter_id, outcome);
196
197        Ok(RegionResponse::new(0))
198    }
199
200    async fn handle_remote_dyn_filter_unregister(
201        &self,
202        query_id: &QueryId,
203        request: &api::v1::region::RemoteDynFilterUnregister,
204    ) -> Result<RegionResponse> {
205        if request.filter_id.is_empty() {
206            return error::MissingRequiredFieldSnafu { name: "filter_id" }.fail();
207        }
208
209        let filter_id = RemoteDynFilterId::new(request.filter_id.clone());
210        let outcome = unregister_remote_dyn_filter(
211            &self.inner.initial_remote_dyn_filter_registrations,
212            query_id,
213            &filter_id,
214        );
215        self.log_remote_dyn_filter_update_outcome(query_id, &filter_id, outcome);
216
217        Ok(RegionResponse::new(0))
218    }
219
220    fn log_remote_dyn_filter_update_outcome(
221        &self,
222        query_id: &QueryId,
223        filter_id: &RemoteDynFilterId,
224        outcome: RemoteDynFilterUpdateOutcome,
225    ) {
226        if matches!(
227            outcome,
228            RemoteDynFilterUpdateOutcome::MissingRegistration
229                | RemoteDynFilterUpdateOutcome::AlreadyComplete
230                | RemoteDynFilterUpdateOutcome::PayloadTooLarge
231                | RemoteDynFilterUpdateOutcome::DecodeFailed
232        ) {
233            warn!(
234                "Remote dynamic filter update outcome, query_id: {}, filter_id: {}, outcome: {:?}",
235                query_id, filter_id, outcome
236            );
237        } else {
238            debug!(
239                "Remote dynamic filter update outcome, query_id: {}, filter_id: {}, outcome: {:?}",
240                query_id, filter_id, outcome
241            );
242        }
243    }
244}
245
246pub(super) fn wrap_remote_dyn_filter_guarded_stream(
247    stream: SendableRecordBatchStream,
248    cleanup: RemoteDynFilterRegistrationGuard,
249) -> SendableRecordBatchStream {
250    Box::pin(RemoteDynFilterGuardedStream { stream, cleanup })
251}
252
253/// Removes query-scoped remote dynamic filter subscriptions unless ownership is moved elsewhere.
254pub(super) struct RemoteDynFilterRegistrationGuard {
255    server: RegionServer,
256    query_id: QueryId,
257    region_id: RegionId,
258    filter_ids: Vec<RemoteDynFilterId>,
259    cleaned: bool,
260}
261
262impl RemoteDynFilterRegistrationGuard {
263    fn new(
264        server: RegionServer,
265        query_id: QueryId,
266        region_id: RegionId,
267        filter_ids: Vec<RemoteDynFilterId>,
268    ) -> Self {
269        Self {
270            server,
271            query_id,
272            region_id,
273            filter_ids,
274            cleaned: false,
275        }
276    }
277
278    fn cleanup_once(&mut self) {
279        if self.cleaned {
280            return;
281        }
282
283        remove_initial_dyn_filter_regs(
284            &self.server.inner.initial_remote_dyn_filter_registrations,
285            &self.query_id,
286            self.region_id,
287            &self.filter_ids,
288        );
289        self.cleaned = true;
290    }
291}
292
293impl Drop for RemoteDynFilterRegistrationGuard {
294    fn drop(&mut self) {
295        self.cleanup_once();
296    }
297}
298
299/// Removes query-scoped remote dynamic filter subscriptions when a remote read stream is done.
300struct RemoteDynFilterGuardedStream {
301    stream: SendableRecordBatchStream,
302    cleanup: RemoteDynFilterRegistrationGuard,
303}
304
305impl RecordBatchStream for RemoteDynFilterGuardedStream {
306    fn name(&self) -> &str {
307        self.stream.name()
308    }
309
310    fn schema(&self) -> datatypes::schema::SchemaRef {
311        self.stream.schema()
312    }
313
314    fn output_ordering(&self) -> Option<&[OrderOption]> {
315        self.stream.output_ordering()
316    }
317
318    fn metrics(&self) -> Option<RecordBatchMetrics> {
319        self.stream.metrics()
320    }
321}
322
323impl Stream for RemoteDynFilterGuardedStream {
324    type Item = common_recordbatch::error::Result<RecordBatch>;
325
326    fn size_hint(&self) -> (usize, Option<usize>) {
327        self.stream.size_hint()
328    }
329
330    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
331        match Pin::new(&mut self.stream).poll_next(cx) {
332            Poll::Ready(None) => {
333                self.cleanup.cleanup_once();
334                Poll::Ready(None)
335            }
336            other => other,
337        }
338    }
339}
340
341#[cfg(test)]
342mod tests {
343    use std::assert_matches;
344    use std::collections::{HashMap, HashSet};
345    use std::sync::Arc;
346    use std::time::Duration;
347
348    use api::v1::region::{
349        RemoteDynFilterRequest, RemoteDynFilterUnregister, RemoteDynFilterUpdate,
350        remote_dyn_filter_request,
351    };
352    use common_query::request::{
353        DynFilterPayload, INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
354        InitialDynFilterReg, InitialDynFilterRegs, InitialDynFilterSnapshot,
355    };
356    use common_recordbatch::RecordBatches;
357    use datafusion::arrow::datatypes::Schema as ArrowSchema;
358    use datafusion::physical_plan::PhysicalExpr;
359    use datafusion::physical_plan::expressions::{DynamicFilterPhysicalExpr, lit as physical_lit};
360    use datafusion_common::DFSchema;
361    use datafusion_expr::EmptyRelation;
362    use datatypes::prelude::{ConcreteDataType, VectorRef};
363    use datatypes::schema::{ColumnSchema, Schema};
364    use datatypes::vectors::Int32Vector;
365    use futures_util::StreamExt;
366    use query::options::QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN;
367    use session::context::QueryContext;
368    use session::hints::REMOTE_QUERY_ID_EXTENSION_KEY;
369    use session::query_id::QueryId;
370    use store_api::storage::RegionId;
371
372    use super::*;
373    use crate::region_server::registrations::{
374        REMOTE_DYN_FILTER_PAYLOAD_MAX_BYTES, RemoteDynFilterRegistry,
375    };
376    use crate::tests::mock_region_server;
377
378    #[derive(Debug, Clone)]
379    struct RegisteredDynFilterSnapshot {
380        filter_id: RemoteDynFilterId,
381        child_exprs_datafusion_proto: Vec<Vec<u8>>,
382        subscriber_regions: HashSet<RegionId>,
383    }
384
385    fn query_regs(
386        regs_by_query: &RemoteDynFilterRegistry,
387        query_id: &QueryId,
388    ) -> Option<HashMap<RemoteDynFilterId, RegisteredDynFilterSnapshot>> {
389        regs_by_query.inspect_query(query_id, |query_regs| {
390            query_regs
391                .iter()
392                .map(|(filter_id, registered)| {
393                    (
394                        filter_id.clone(),
395                        RegisteredDynFilterSnapshot {
396                            filter_id: registered.filter_id.clone(),
397                            child_exprs_datafusion_proto: registered
398                                .child_exprs_datafusion_proto
399                                .clone(),
400                            subscriber_regions: registered.subscriber_regions.clone(),
401                        },
402                    )
403                })
404                .collect()
405        })
406    }
407
408    fn test_remote_query_id() -> QueryId {
409        QueryId::new()
410    }
411
412    fn test_remote_dyn_filter_region_id() -> RegionId {
413        RegionId::new(1024, 7)
414    }
415
416    fn single_value_stream() -> common_recordbatch::SendableRecordBatchStream {
417        let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
418            "v",
419            ConcreteDataType::int32_datatype(),
420            false,
421        )]));
422        let values: VectorRef = Arc::new(Int32Vector::from_slice([1]));
423        let batch = common_recordbatch::RecordBatch::new(schema.clone(), vec![values]).unwrap();
424        RecordBatches::try_new(schema, vec![batch])
425            .unwrap()
426            .as_stream()
427    }
428
429    fn empty_logical_plan() -> LogicalPlan {
430        LogicalPlan::EmptyRelation(EmptyRelation {
431            produce_one_row: false,
432            schema: Arc::new(DFSchema::empty()),
433        })
434    }
435
436    fn query_context_with_initial_regs(query_id: QueryId) -> QueryContext {
437        let mut query_ctx = QueryContext::with("greptime", "public");
438        query_ctx.set_extension(REMOTE_QUERY_ID_EXTENSION_KEY, query_id.to_string());
439        query_ctx.set_extension(
440            INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
441            InitialDynFilterRegs::new(vec![InitialDynFilterReg::new("filter-1", vec![])])
442                .to_extension_value()
443                .unwrap(),
444        );
445        query_ctx
446    }
447
448    #[test]
449    fn remote_dyn_filter_receiver_plan_respects_disabled_query_option() {
450        let mock_region_server = mock_region_server();
451        let query_id = test_remote_query_id();
452        let mut query_ctx = query_context_with_initial_regs(query_id);
453        query_ctx.set_extension(QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN, "false");
454
455        let plan = remote_dyn_filter_receiver_plan(
456            &Arc::downgrade(&mock_region_server.inner),
457            empty_logical_plan(),
458            Arc::new(query_ctx),
459        );
460
461        assert!(matches!(plan, LogicalPlan::EmptyRelation(_)));
462    }
463
464    #[test]
465    fn remote_dyn_filter_cleanup_registration_respects_disabled_query_option() {
466        let mock_region_server = mock_region_server();
467        let query_id = test_remote_query_id();
468        let mut query_ctx = query_context_with_initial_regs(query_id);
469        query_ctx.set_extension(QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN, "false");
470        let query_ctx = Arc::new(query_ctx);
471
472        let cleanup = mock_region_server.register_initial_remote_dyn_filter_cleanup(
473            &query_ctx,
474            test_remote_dyn_filter_region_id(),
475        );
476
477        assert!(cleanup.is_none());
478        assert!(
479            mock_region_server
480                .inner
481                .initial_remote_dyn_filter_registrations
482                .inspect_query(&query_id, |_| ())
483                .is_none()
484        );
485    }
486
487    #[test]
488    fn initial_dyn_filter_regs_can_be_read_from_query_context() {
489        let mut query_ctx = QueryContext::with("greptime", "public");
490        query_ctx.set_extension(
491            INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
492            InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
493                "filter-1",
494                vec![vec![1, 2, 3]],
495            )])
496            .to_extension_value()
497            .unwrap(),
498        );
499
500        let regs = initial_dyn_filter_regs_from_query_ctx(&Arc::new(query_ctx)).unwrap();
501
502        assert_eq!(regs.regs.len(), 1);
503        assert_eq!(regs.regs[0].filter_id, "filter-1");
504    }
505
506    #[test]
507    fn initial_dyn_filter_regs_from_query_context_rejects_duplicate_filter_ids() {
508        let mut query_ctx = QueryContext::with("greptime", "public");
509        query_ctx.set_extension(
510            INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
511            InitialDynFilterRegs::new(vec![
512                InitialDynFilterReg::new("filter-1", vec![vec![1, 2, 3]]),
513                InitialDynFilterReg::new("filter-1", vec![vec![4, 5, 6]]),
514            ])
515            .to_extension_value()
516            .unwrap(),
517        );
518
519        let regs = initial_dyn_filter_regs_from_query_ctx(&Arc::new(query_ctx));
520
521        assert!(regs.is_none());
522    }
523
524    #[test]
525    fn register_initial_dyn_filter_regs_creates_query_scoped_entries() {
526        let regs_by_query = RemoteDynFilterRegistry::new();
527        let regs = InitialDynFilterRegs::new(vec![
528            InitialDynFilterReg::new("filter-1", vec![vec![1, 2, 3]]),
529            InitialDynFilterReg::new("filter-2", vec![vec![4, 5, 6]]),
530        ]);
531        let query_id = test_remote_query_id();
532        let region_id = test_remote_dyn_filter_region_id();
533
534        let registered_filter_ids =
535            register_initial_dyn_filter_regs(&regs_by_query, &query_id, region_id, &regs);
536
537        let query_regs = query_regs(&regs_by_query, &query_id).unwrap();
538        assert_eq!(query_regs.len(), 2);
539        assert_eq!(
540            registered_filter_ids,
541            vec![
542                RemoteDynFilterId::new("filter-1"),
543                RemoteDynFilterId::new("filter-2")
544            ]
545        );
546        let registered = query_regs.get(&RemoteDynFilterId::new("filter-1")).unwrap();
547        assert_eq!(registered.filter_id, RemoteDynFilterId::new("filter-1"));
548        assert_eq!(registered.child_exprs_datafusion_proto, vec![vec![1, 2, 3]]);
549        assert_eq!(registered.subscriber_regions.len(), 1);
550        assert!(registered.subscriber_regions.contains(&region_id));
551    }
552
553    #[test]
554    fn register_initial_dyn_filter_regs_same_region_duplicate_is_idempotent() {
555        let regs_by_query = RemoteDynFilterRegistry::new();
556        let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
557            "filter-1",
558            vec![vec![1, 2, 3]],
559        )]);
560        let query_id = test_remote_query_id();
561        let region_id = test_remote_dyn_filter_region_id();
562
563        let first = register_initial_dyn_filter_regs(&regs_by_query, &query_id, region_id, &regs);
564        let duplicate =
565            register_initial_dyn_filter_regs(&regs_by_query, &query_id, region_id, &regs);
566
567        let query_regs = query_regs(&regs_by_query, &query_id).unwrap();
568        assert_eq!(query_regs.len(), 1);
569        assert_eq!(first, vec![RemoteDynFilterId::new("filter-1")]);
570        assert!(duplicate.is_empty());
571        let registered = query_regs.get(&RemoteDynFilterId::new("filter-1")).unwrap();
572        assert_eq!(registered.subscriber_regions.len(), 1);
573        assert!(registered.subscriber_regions.contains(&region_id));
574    }
575
576    #[test]
577    fn register_initial_dyn_filter_regs_ignores_invalid_duplicate_payload_set() {
578        let regs_by_query = RemoteDynFilterRegistry::new();
579        let regs = InitialDynFilterRegs::new(vec![
580            InitialDynFilterReg::new("filter-1", vec![vec![1, 2, 3]]),
581            InitialDynFilterReg::new("filter-1", vec![vec![4, 5, 6]]),
582        ]);
583        let query_id = test_remote_query_id();
584        let region_id = test_remote_dyn_filter_region_id();
585
586        register_initial_dyn_filter_regs(&regs_by_query, &query_id, region_id, &regs);
587
588        assert!(query_regs(&regs_by_query, &query_id).is_none());
589    }
590
591    #[test]
592    fn register_initial_dyn_filter_regs_tracks_different_region_subscribers_for_same_filter() {
593        let regs_by_query = RemoteDynFilterRegistry::new();
594        let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
595            "filter-1",
596            vec![vec![1, 2, 3]],
597        )]);
598        let query_id = test_remote_query_id();
599        let first_region_id = RegionId::new(1024, 7);
600        let second_region_id = RegionId::new(1024, 8);
601
602        register_initial_dyn_filter_regs(&regs_by_query, &query_id, first_region_id, &regs);
603        register_initial_dyn_filter_regs(&regs_by_query, &query_id, second_region_id, &regs);
604
605        let query_regs = query_regs(&regs_by_query, &query_id).unwrap();
606        assert_eq!(query_regs.len(), 1);
607        let registered = query_regs.get(&RemoteDynFilterId::new("filter-1")).unwrap();
608        assert_eq!(registered.subscriber_regions.len(), 2);
609        assert!(registered.subscriber_regions.contains(&first_region_id));
610        assert!(registered.subscriber_regions.contains(&second_region_id));
611    }
612
613    #[test]
614    fn remove_initial_dyn_filter_regs_removes_registered_filter_entries() {
615        let regs_by_query = RemoteDynFilterRegistry::new();
616        let query_id = test_remote_query_id();
617        let other_query_id = test_remote_query_id();
618        let region_id = test_remote_dyn_filter_region_id();
619
620        let registered_filter_ids = register_initial_dyn_filter_regs(
621            &regs_by_query,
622            &query_id,
623            region_id,
624            &InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
625                "filter-1",
626                vec![vec![1, 2, 3]],
627            )]),
628        );
629        register_initial_dyn_filter_regs(
630            &regs_by_query,
631            &other_query_id,
632            region_id,
633            &InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
634                "filter-2",
635                vec![vec![4, 5, 6]],
636            )]),
637        );
638
639        remove_initial_dyn_filter_regs(
640            &regs_by_query,
641            &query_id,
642            region_id,
643            &registered_filter_ids,
644        );
645
646        assert!(query_regs(&regs_by_query, &query_id).is_none());
647        let other_query_regs = query_regs(&regs_by_query, &other_query_id).unwrap();
648        assert_eq!(other_query_regs.len(), 1);
649    }
650
651    #[test]
652    fn remove_initial_dyn_filter_regs_keeps_other_subscribers() {
653        let regs_by_query = RemoteDynFilterRegistry::new();
654        let query_id = test_remote_query_id();
655        let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
656            "filter-1",
657            vec![vec![1, 2, 3]],
658        )]);
659        let first_region_id = RegionId::new(1024, 7);
660        let second_region_id = RegionId::new(1024, 8);
661
662        let first_subscription =
663            register_initial_dyn_filter_regs(&regs_by_query, &query_id, first_region_id, &regs);
664        register_initial_dyn_filter_regs(&regs_by_query, &query_id, second_region_id, &regs);
665
666        remove_initial_dyn_filter_regs(
667            &regs_by_query,
668            &query_id,
669            first_region_id,
670            &first_subscription,
671        );
672
673        let query_regs = query_regs(&regs_by_query, &query_id).unwrap();
674        assert_eq!(query_regs.len(), 1);
675        let registered = query_regs.get(&RemoteDynFilterId::new("filter-1")).unwrap();
676        assert_eq!(registered.subscriber_regions.len(), 1);
677        assert!(registered.subscriber_regions.contains(&second_region_id));
678    }
679
680    #[tokio::test]
681    async fn test_handle_remote_dyn_filter_request_requires_query_id() {
682        let mock_region_server = mock_region_server();
683
684        let err = mock_region_server
685            .handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
686                query_id: String::new(),
687                action: Some(remote_dyn_filter_request::Action::Unregister(
688                    RemoteDynFilterUnregister {
689                        filter_id: "filter-1".to_string(),
690                    },
691                )),
692            })
693            .await
694            .unwrap_err();
695
696        assert_matches!(
697            err,
698            crate::error::Error::MissingRequiredField { ref name, .. } if name == "query_id"
699        );
700    }
701
702    #[tokio::test]
703    async fn test_handle_remote_dyn_filter_request_requires_action() {
704        let mock_region_server = mock_region_server();
705
706        let err = mock_region_server
707            .handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
708                query_id: test_remote_query_id().to_string(),
709                action: None,
710            })
711            .await
712            .unwrap_err();
713
714        assert_matches!(
715            err,
716            crate::error::Error::MissingRequiredField { ref name, .. } if name == "action"
717        );
718    }
719
720    #[tokio::test]
721    async fn test_handle_remote_dyn_filter_update_requires_filter_id() {
722        let mock_region_server = mock_region_server();
723
724        let err = mock_region_server
725            .handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
726                query_id: test_remote_query_id().to_string(),
727                action: Some(remote_dyn_filter_request::Action::Update(
728                    RemoteDynFilterUpdate {
729                        filter_id: String::new(),
730                        payload: vec![1],
731                        generation: 1,
732                        is_complete: false,
733                    },
734                )),
735            })
736            .await
737            .unwrap_err();
738
739        assert_matches!(
740            err,
741            crate::error::Error::MissingRequiredField { ref name, .. } if name == "filter_id"
742        );
743    }
744
745    #[tokio::test]
746    async fn test_handle_remote_dyn_filter_update_requires_payload() {
747        let mock_region_server = mock_region_server();
748
749        let err = mock_region_server
750            .handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
751                query_id: test_remote_query_id().to_string(),
752                action: Some(remote_dyn_filter_request::Action::Update(
753                    RemoteDynFilterUpdate {
754                        filter_id: "filter-1".to_string(),
755                        payload: Vec::new(),
756                        generation: 1,
757                        is_complete: false,
758                    },
759                )),
760            })
761            .await
762            .unwrap_err();
763
764        assert_matches!(
765            err,
766            crate::error::Error::MissingRequiredField { ref name, .. } if name == "payload"
767        );
768    }
769
770    #[test]
771    fn test_apply_remote_dyn_filter_update_missing_registration() {
772        let regs_by_query = RemoteDynFilterRegistry::new();
773        let query_id = test_remote_query_id();
774        let filter_id = RemoteDynFilterId::new("filter-1");
775
776        let outcome =
777            apply_remote_dyn_filter_update(&regs_by_query, &query_id, &filter_id, &[1], 1, false);
778        assert_eq!(outcome, RemoteDynFilterUpdateOutcome::MissingRegistration);
779
780        let outcome = unregister_remote_dyn_filter(&regs_by_query, &query_id, &filter_id);
781        assert_eq!(outcome, RemoteDynFilterUpdateOutcome::MissingRegistration);
782    }
783
784    #[test]
785    fn test_apply_remote_dyn_filter_update_buffering_before_scan() {
786        let regs_by_query = RemoteDynFilterRegistry::new();
787        let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
788            "filter-1",
789            vec![vec![1, 2, 3]],
790        )]);
791        let query_id = test_remote_query_id();
792        let filter_id = RemoteDynFilterId::new("filter-1");
793
794        register_initial_dyn_filter_regs(
795            &regs_by_query,
796            &query_id,
797            test_remote_dyn_filter_region_id(),
798            &regs,
799        );
800
801        // First update with generation 1: should be Buffered (no runtime installed yet)
802        let outcome =
803            apply_remote_dyn_filter_update(&regs_by_query, &query_id, &filter_id, &[1], 1, false);
804        assert_eq!(outcome, RemoteDynFilterUpdateOutcome::Buffered);
805
806        // Update with generation 0: should be Stale (older than pending generation 1)
807        let outcome =
808            apply_remote_dyn_filter_update(&regs_by_query, &query_id, &filter_id, &[2], 0, false);
809        assert_eq!(outcome, RemoteDynFilterUpdateOutcome::Stale);
810
811        // Update with generation 1 again: should be Idempotent (same generation)
812        let outcome =
813            apply_remote_dyn_filter_update(&regs_by_query, &query_id, &filter_id, &[3], 1, false);
814        assert_eq!(outcome, RemoteDynFilterUpdateOutcome::Idempotent);
815    }
816
817    fn datafusion_payload_bytes(expr: Arc<dyn PhysicalExpr>) -> Vec<u8> {
818        match DynFilterPayload::from_datafusion_expr(&expr, REMOTE_DYN_FILTER_PAYLOAD_MAX_BYTES)
819            .unwrap()
820        {
821            DynFilterPayload::Datafusion(bytes) => bytes,
822            _ => unreachable!(
823                "DynFilterPayload::from_datafusion_expr only returns datafusion payloads"
824            ),
825        }
826    }
827
828    fn empty_arrow_schema() -> ArrowSchema {
829        ArrowSchema::empty()
830    }
831
832    fn register_empty_remote_dyn_filter(
833        regs_by_query: &RemoteDynFilterRegistry,
834        query_id: &QueryId,
835    ) -> Vec<RemoteDynFilterId> {
836        register_empty_remote_dyn_filter_for_region(
837            regs_by_query,
838            query_id,
839            test_remote_dyn_filter_region_id(),
840        )
841    }
842
843    fn register_empty_remote_dyn_filter_for_region(
844        regs_by_query: &RemoteDynFilterRegistry,
845        query_id: &QueryId,
846        region_id: RegionId,
847    ) -> Vec<RemoteDynFilterId> {
848        register_initial_dyn_filter_regs(
849            regs_by_query,
850            query_id,
851            region_id,
852            &InitialDynFilterRegs::new(vec![InitialDynFilterReg::new("filter-1", vec![])]),
853        )
854    }
855
856    #[test]
857    fn initial_remote_dyn_filter_snapshot_initializes_runtime_filter() {
858        let regs_by_query = RemoteDynFilterRegistry::new();
859        let query_id = test_remote_query_id();
860        let payload = DynFilterPayload::Datafusion(datafusion_payload_bytes(physical_lit(false)));
861        let regs = InitialDynFilterRegs::new(vec![
862            InitialDynFilterReg::new("filter-1", vec![])
863                .with_initial_snapshot(InitialDynFilterSnapshot::new(payload, 7, false)),
864        ]);
865
866        register_initial_dyn_filter_regs(
867            &regs_by_query,
868            &query_id,
869            test_remote_dyn_filter_region_id(),
870            &regs,
871        );
872        let exprs = remote_dyn_filter_exprs_for_initial_regs(
873            &regs_by_query,
874            &query_id,
875            &regs,
876            &empty_arrow_schema(),
877        );
878
879        assert_eq!(exprs.len(), 1);
880        assert_eq!(format!("{}", exprs[0]), "DynamicFilter [ false ]");
881    }
882
883    fn only_remote_dyn_filter(
884        regs_by_query: &RemoteDynFilterRegistry,
885        query_id: &QueryId,
886    ) -> Arc<DynamicFilterPhysicalExpr> {
887        let initial_regs =
888            InitialDynFilterRegs::new(vec![InitialDynFilterReg::new("filter-1", vec![])]);
889        let exprs = remote_dyn_filter_exprs_for_initial_regs(
890            regs_by_query,
891            query_id,
892            &initial_regs,
893            &empty_arrow_schema(),
894        );
895        assert_eq!(1, exprs.len());
896        let expr = exprs.into_iter().next().unwrap();
897        let expr = expr as Arc<dyn std::any::Any + Send + Sync>;
898        expr.downcast::<DynamicFilterPhysicalExpr>().unwrap()
899    }
900
901    #[test]
902    fn test_remote_dyn_filter_rejects_oversized_payload_before_buffering() {
903        let regs_by_query = RemoteDynFilterRegistry::new();
904        let query_id = test_remote_query_id();
905        let filter_id = RemoteDynFilterId::new("filter-1");
906        register_empty_remote_dyn_filter(&regs_by_query, &query_id);
907
908        let oversized = vec![0; REMOTE_DYN_FILTER_PAYLOAD_MAX_BYTES + 1];
909        let outcome = apply_remote_dyn_filter_update(
910            &regs_by_query,
911            &query_id,
912            &filter_id,
913            &oversized,
914            1,
915            false,
916        );
917        assert_eq!(outcome, RemoteDynFilterUpdateOutcome::PayloadTooLarge);
918
919        // The rejected generation must not become the pending generation.
920        let outcome =
921            apply_remote_dyn_filter_update(&regs_by_query, &query_id, &filter_id, &[1], 0, false);
922        assert_eq!(outcome, RemoteDynFilterUpdateOutcome::Buffered);
923    }
924
925    #[test]
926    fn test_remote_dyn_filter_generation_zero_applies_as_first_update() {
927        let regs_by_query = RemoteDynFilterRegistry::new();
928        let query_id = test_remote_query_id();
929        let filter_id = RemoteDynFilterId::new("filter-1");
930        register_empty_remote_dyn_filter(&regs_by_query, &query_id);
931
932        // Installing the wrapper before the update exercises the runtime apply path directly.
933        let dyn_filter = only_remote_dyn_filter(&regs_by_query, &query_id);
934        let payload = datafusion_payload_bytes(physical_lit(false));
935        let outcome = apply_remote_dyn_filter_update(
936            &regs_by_query,
937            &query_id,
938            &filter_id,
939            &payload,
940            0,
941            false,
942        );
943
944        assert_eq!(outcome, RemoteDynFilterUpdateOutcome::Applied);
945        assert!(dyn_filter.snapshot_generation() > 1);
946        assert_eq!(format!("{}", dyn_filter.current().unwrap()), "false");
947    }
948
949    #[test]
950    fn test_buffered_update_applies_when_scan_installs_wrapper() {
951        let regs_by_query = RemoteDynFilterRegistry::new();
952        let query_id = test_remote_query_id();
953        let filter_id = RemoteDynFilterId::new("filter-1");
954        register_empty_remote_dyn_filter(&regs_by_query, &query_id);
955
956        let payload = datafusion_payload_bytes(physical_lit(false));
957        let outcome = apply_remote_dyn_filter_update(
958            &regs_by_query,
959            &query_id,
960            &filter_id,
961            &payload,
962            1,
963            false,
964        );
965        assert_eq!(outcome, RemoteDynFilterUpdateOutcome::Buffered);
966
967        let dyn_filter = only_remote_dyn_filter(&regs_by_query, &query_id);
968        assert!(dyn_filter.snapshot_generation() > 1);
969        assert_eq!(format!("{}", dyn_filter.current().unwrap()), "false");
970    }
971
972    #[tokio::test]
973    async fn test_unregister_completes_installed_remote_dyn_filter_without_relaxing() {
974        let regs_by_query = RemoteDynFilterRegistry::new();
975        let query_id = test_remote_query_id();
976        let filter_id = RemoteDynFilterId::new("filter-1");
977        register_empty_remote_dyn_filter(&regs_by_query, &query_id);
978
979        let dyn_filter = only_remote_dyn_filter(&regs_by_query, &query_id);
980        let payload = datafusion_payload_bytes(physical_lit(false));
981        let outcome = apply_remote_dyn_filter_update(
982            &regs_by_query,
983            &query_id,
984            &filter_id,
985            &payload,
986            1,
987            false,
988        );
989        assert_eq!(outcome, RemoteDynFilterUpdateOutcome::Applied);
990        assert_eq!(format!("{}", dyn_filter.current().unwrap()), "false");
991
992        let outcome = unregister_remote_dyn_filter(&regs_by_query, &query_id, &filter_id);
993        assert_eq!(outcome, RemoteDynFilterUpdateOutcome::Applied);
994        assert!(query_regs(&regs_by_query, &query_id).is_none());
995        tokio::time::timeout(Duration::from_secs(1), dyn_filter.wait_complete())
996            .await
997            .unwrap();
998        assert_eq!(format!("{}", dyn_filter.current().unwrap()), "false");
999    }
1000
1001    #[tokio::test]
1002    async fn test_remote_dyn_filter_unregister_removes_all_region_subscribers_for_filter() {
1003        let regs_by_query = RemoteDynFilterRegistry::new();
1004        let query_id = test_remote_query_id();
1005        let filter_id = RemoteDynFilterId::new("filter-1");
1006        let first_region_id = RegionId::new(1024, 7);
1007        let second_region_id = RegionId::new(1024, 8);
1008        let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new("filter-1", vec![])]);
1009
1010        let first_subscription =
1011            register_initial_dyn_filter_regs(&regs_by_query, &query_id, first_region_id, &regs);
1012        let second_subscription =
1013            register_initial_dyn_filter_regs(&regs_by_query, &query_id, second_region_id, &regs);
1014        let dyn_filter = only_remote_dyn_filter(&regs_by_query, &query_id);
1015
1016        let payload = datafusion_payload_bytes(physical_lit(false));
1017        let outcome = apply_remote_dyn_filter_update(
1018            &regs_by_query,
1019            &query_id,
1020            &filter_id,
1021            &payload,
1022            1,
1023            false,
1024        );
1025        assert_eq!(outcome, RemoteDynFilterUpdateOutcome::Applied);
1026
1027        let outcome = unregister_remote_dyn_filter(&regs_by_query, &query_id, &filter_id);
1028        assert_eq!(outcome, RemoteDynFilterUpdateOutcome::Applied);
1029        assert!(query_regs(&regs_by_query, &query_id).is_none());
1030        tokio::time::timeout(Duration::from_secs(1), dyn_filter.wait_complete())
1031            .await
1032            .unwrap();
1033        assert_eq!(format!("{}", dyn_filter.current().unwrap()), "false");
1034
1035        // Stream-local cleanup may run after FE's peer-deduplicated unregister. It should be benign.
1036        remove_initial_dyn_filter_regs(
1037            &regs_by_query,
1038            &query_id,
1039            first_region_id,
1040            &first_subscription,
1041        );
1042        remove_initial_dyn_filter_regs(
1043            &regs_by_query,
1044            &query_id,
1045            second_region_id,
1046            &second_subscription,
1047        );
1048        assert!(query_regs(&regs_by_query, &query_id).is_none());
1049    }
1050
1051    #[test]
1052    fn test_remote_dyn_filter_unregister_keeps_other_filters_for_same_query() {
1053        let regs_by_query = RemoteDynFilterRegistry::new();
1054        let query_id = test_remote_query_id();
1055        let region_id = test_remote_dyn_filter_region_id();
1056
1057        register_initial_dyn_filter_regs(
1058            &regs_by_query,
1059            &query_id,
1060            region_id,
1061            &InitialDynFilterRegs::new(vec![InitialDynFilterReg::new("filter-1", vec![])]),
1062        );
1063        register_initial_dyn_filter_regs(
1064            &regs_by_query,
1065            &query_id,
1066            region_id,
1067            &InitialDynFilterRegs::new(vec![InitialDynFilterReg::new("filter-2", vec![])]),
1068        );
1069
1070        let outcome = unregister_remote_dyn_filter(
1071            &regs_by_query,
1072            &query_id,
1073            &RemoteDynFilterId::new("filter-1"),
1074        );
1075        assert_eq!(outcome, RemoteDynFilterUpdateOutcome::Applied);
1076
1077        let query_regs = query_regs(&regs_by_query, &query_id).unwrap();
1078        assert!(!query_regs.contains_key(&RemoteDynFilterId::new("filter-1")));
1079        assert!(query_regs.contains_key(&RemoteDynFilterId::new("filter-2")));
1080    }
1081
1082    #[tokio::test]
1083    async fn test_remote_dyn_filter_guarded_stream_removes_on_eof() {
1084        let mock_region_server = mock_region_server();
1085        let query_id = test_remote_query_id();
1086        let region_id = test_remote_dyn_filter_region_id();
1087        let registered_filter_ids = register_empty_remote_dyn_filter(
1088            &mock_region_server
1089                .inner
1090                .initial_remote_dyn_filter_registrations,
1091            &query_id,
1092        );
1093        let cleanup = RemoteDynFilterRegistrationGuard::new(
1094            mock_region_server.clone(),
1095            query_id,
1096            region_id,
1097            registered_filter_ids,
1098        );
1099
1100        let stream = wrap_remote_dyn_filter_guarded_stream(single_value_stream(), cleanup);
1101        let mut pinned = Box::pin(stream);
1102        while pinned.next().await.is_some() {}
1103
1104        assert!(
1105            mock_region_server
1106                .inner
1107                .initial_remote_dyn_filter_registrations
1108                .inspect_query(&query_id, |_| ())
1109                .is_none()
1110        );
1111    }
1112
1113    #[tokio::test]
1114    async fn test_remote_dyn_filter_multi_region_subscription_cleanup() {
1115        let regs_by_query = RemoteDynFilterRegistry::new();
1116        let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new("filter-1", vec![])]);
1117        let query_id = test_remote_query_id();
1118        let first_region_id = RegionId::new(1024, 7);
1119        let second_region_id = RegionId::new(1024, 8);
1120
1121        // Register the same logical filter for two regions on the same datanode.
1122        let first_subscription =
1123            register_initial_dyn_filter_regs(&regs_by_query, &query_id, first_region_id, &regs);
1124        let second_subscription =
1125            register_initial_dyn_filter_regs(&regs_by_query, &query_id, second_region_id, &regs);
1126        let dyn_filter = only_remote_dyn_filter(&regs_by_query, &query_id);
1127
1128        // Verify only one filter entry exists and both region subscribers are tracked explicitly.
1129        {
1130            let query_regs = query_regs(&regs_by_query, &query_id).unwrap();
1131            assert_eq!(query_regs.len(), 1);
1132            let registered = query_regs.get(&RemoteDynFilterId::new("filter-1")).unwrap();
1133            assert_eq!(registered.subscriber_regions.len(), 2);
1134            assert!(registered.subscriber_regions.contains(&first_region_id));
1135            assert!(registered.subscriber_regions.contains(&second_region_id));
1136        }
1137
1138        // One region cleanup should not drop the entry while another region is subscribed.
1139        remove_initial_dyn_filter_regs(
1140            &regs_by_query,
1141            &query_id,
1142            first_region_id,
1143            &first_subscription,
1144        );
1145        assert!(query_regs(&regs_by_query, &query_id).is_some());
1146        assert!(
1147            tokio::time::timeout(Duration::from_millis(50), dyn_filter.wait_complete())
1148                .await
1149                .is_err()
1150        );
1151        {
1152            let query_regs = query_regs(&regs_by_query, &query_id).unwrap();
1153            let registered = query_regs.get(&RemoteDynFilterId::new("filter-1")).unwrap();
1154            assert_eq!(registered.subscriber_regions.len(), 1);
1155            assert!(registered.subscriber_regions.contains(&second_region_id));
1156        }
1157
1158        // Last region cleanup should drop the entry.
1159        remove_initial_dyn_filter_regs(
1160            &regs_by_query,
1161            &query_id,
1162            second_region_id,
1163            &second_subscription,
1164        );
1165        assert!(query_regs(&regs_by_query, &query_id).is_none());
1166        tokio::time::timeout(Duration::from_secs(1), dyn_filter.wait_complete())
1167            .await
1168            .unwrap();
1169    }
1170}