Skip to main content

query/dist_plan/
remote_dyn_filter_registry.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::sync::{Arc, Mutex, RwLock, Weak};
18use std::time::Duration;
19
20use api::v1::region::{RemoteDynFilterUnregister, RemoteDynFilterUpdate};
21use common_query::request::DynFilterPayload;
22use common_runtime::spawn_global;
23use common_telemetry::{debug, warn};
24use datafusion_physical_expr::PhysicalExpr;
25use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
26use session::query_id::QueryId;
27use store_api::storage::RegionId;
28use tokio::sync::{Notify, watch};
29
30use crate::dist_plan::FilterId;
31use crate::region_query::RegionQueryHandlerRef;
32
33const REMOTE_DYN_FILTER_UPDATE_PAYLOAD_MAX_BYTES: usize = 64 * 1024;
34const REMOTE_DYN_FILTER_RECONCILE_INTERVAL: Duration = Duration::from_secs(1);
35/// Bound best-effort RDF control RPCs so one bad subscriber cannot stall fanout.
36const REMOTE_DYN_FILTER_CONTROL_RPC_TIMEOUT: Duration = Duration::from_secs(10);
37
38/// Region subscribed to a remote dynamic filter.
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40pub struct Subscriber {
41    region_id: RegionId,
42}
43
44impl Subscriber {
45    pub fn new(region_id: RegionId) -> Self {
46        Self { region_id }
47    }
48
49    pub fn region_id(&self) -> RegionId {
50        self.region_id
51    }
52}
53
54/// Result of registering a remote dynamic filter entry.
55#[derive(Debug, Clone)]
56pub enum EntryRegistration {
57    Inserted(Arc<DynFilterEntry>),
58    /// The filter already existed; this contains the previously registered entry.
59    Existing(Arc<DynFilterEntry>),
60}
61
62/// Result of registering a subscriber under an existing filter entry.
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum SubscriberRegistration {
65    Added,
66    Duplicate,
67    MissingFilter,
68}
69
70/// A registered query-local producer filter and its region subscribers.
71#[derive(Debug)]
72pub struct DynFilterEntry {
73    filter_id: FilterId,
74    producer_filter: Weak<DynamicFilterPhysicalExpr>,
75    subscribers: RwLock<HashSet<Subscriber>>,
76    state: Mutex<DynFilterEntryState>,
77    subscriber_changed: Notify,
78}
79
80#[derive(Debug, Default)]
81struct DynFilterEntryState {
82    last_sent_generation: u64,
83    unregistered: bool,
84    fanout_started: bool,
85}
86
87#[derive(Debug)]
88struct QueryDynFilterRegistryInner {
89    entries: HashMap<FilterId, Arc<DynFilterEntry>>,
90}
91
92impl DynFilterEntry {
93    pub fn new(filter_id: FilterId, producer_filter: Arc<DynamicFilterPhysicalExpr>) -> Self {
94        Self {
95            filter_id,
96            producer_filter: Arc::downgrade(&producer_filter),
97            subscribers: RwLock::new(HashSet::new()),
98            state: Mutex::new(DynFilterEntryState::default()),
99            subscriber_changed: Notify::new(),
100        }
101    }
102
103    pub fn filter_id(&self) -> &FilterId {
104        &self.filter_id
105    }
106
107    pub fn upgrade_producer_filter(&self) -> Option<Arc<DynamicFilterPhysicalExpr>> {
108        self.producer_filter.upgrade()
109    }
110
111    pub fn subscribers(&self) -> Vec<Subscriber> {
112        self.subscribers.read().unwrap().iter().cloned().collect()
113    }
114
115    pub fn register_subscriber(&self, subscriber: Subscriber) -> bool {
116        let mut subscribers = self.subscribers.write().unwrap();
117        subscribers.insert(subscriber)
118    }
119
120    fn mark_generation_sent(&self, generation: u64) -> bool {
121        let mut state = self.state.lock().unwrap();
122        if generation <= state.last_sent_generation {
123            return false;
124        }
125
126        state.last_sent_generation = generation;
127        true
128    }
129
130    fn try_mark_unregistered(&self) -> bool {
131        let mut state = self.state.lock().unwrap();
132        if state.unregistered {
133            return false;
134        }
135
136        state.unregistered = true;
137        true
138    }
139
140    fn reactivate_for_new_subscriber(&self) {
141        {
142            let mut state = self.state.lock().unwrap();
143            // Reset generation/unregister state so late subscribers get the current snapshot.
144            state.last_sent_generation = 0;
145            state.unregistered = false;
146        }
147        self.subscriber_changed.notify_one();
148    }
149
150    fn mark_fanout_started(&self) -> bool {
151        let mut state = self.state.lock().unwrap();
152        if state.fanout_started {
153            return false;
154        }
155
156        state.fanout_started = true;
157        true
158    }
159
160    #[cfg(test)]
161    pub(crate) fn fanout_started_for_test(&self) -> bool {
162        self.state.lock().unwrap().fanout_started
163    }
164}
165
166/// Query-scoped registry that owns all remote dynamic filters for one query.
167#[derive(Debug)]
168pub struct QueryDynFilterRegistry {
169    query_id: QueryId,
170    lifecycle_tx: watch::Sender<()>,
171    inner: RwLock<QueryDynFilterRegistryInner>,
172}
173
174impl QueryDynFilterRegistry {
175    pub fn new(query_id: QueryId) -> Self {
176        // Close-only lifecycle signal; dropping the registry closes it for watchers.
177        let (lifecycle_tx, _) = watch::channel(());
178        Self {
179            query_id,
180            lifecycle_tx,
181            inner: RwLock::new(QueryDynFilterRegistryInner {
182                entries: HashMap::new(),
183            }),
184        }
185    }
186
187    pub fn query_id(&self) -> QueryId {
188        self.query_id
189    }
190
191    pub fn entry_count(&self) -> usize {
192        self.inner.read().unwrap().entries.len()
193    }
194
195    pub fn entries(&self) -> Vec<Arc<DynFilterEntry>> {
196        self.inner
197            .read()
198            .unwrap()
199            .entries
200            .values()
201            .cloned()
202            .collect()
203    }
204
205    pub fn remote_dyn_filter(&self, filter_id: &FilterId) -> Option<Arc<DynFilterEntry>> {
206        self.inner.read().unwrap().entries.get(filter_id).cloned()
207    }
208
209    pub fn register_remote_dyn_filter(
210        &self,
211        filter_id: FilterId,
212        producer_filter: Arc<DynamicFilterPhysicalExpr>,
213    ) -> EntryRegistration {
214        let mut inner = self.inner.write().unwrap();
215        if let Some(existing) = inner.entries.get(&filter_id) {
216            return EntryRegistration::Existing(existing.clone());
217        }
218
219        let entry = Arc::new(DynFilterEntry::new(filter_id.clone(), producer_filter));
220        inner.entries.insert(filter_id, entry.clone());
221        EntryRegistration::Inserted(entry)
222    }
223
224    pub fn register_subscriber(
225        &self,
226        filter_id: &FilterId,
227        subscriber: Subscriber,
228    ) -> SubscriberRegistration {
229        let Some(entry) = self.inner.read().unwrap().entries.get(filter_id).cloned() else {
230            return SubscriberRegistration::MissingFilter;
231        };
232
233        if entry.register_subscriber(subscriber) {
234            // New subscribers need the current snapshot; existing subscribers may see a duplicate.
235            entry.reactivate_for_new_subscriber();
236            SubscriberRegistration::Added
237        } else {
238            SubscriberRegistration::Duplicate
239        }
240    }
241
242    /// Starts missing producer fanout watchers for the registry's entries.
243    ///
244    /// Watchers do not hold the registry alive; dropping the registry closes their lifecycle channel.
245    pub fn ensure_fanout_task(self: &Arc<Self>, region_query_handler: RegionQueryHandlerRef) {
246        for entry in self.entries() {
247            ensure_entry_fanout_task(
248                self.query_id,
249                entry,
250                region_query_handler.clone(),
251                self.lifecycle_tx.subscribe(),
252            );
253        }
254    }
255
256    #[cfg(test)]
257    async fn fanout_snapshot(
258        &self,
259        region_query_handler: &RegionQueryHandlerRef,
260        entry: &DynFilterEntry,
261        filter: &DynamicFilterPhysicalExpr,
262        is_complete: bool,
263    ) {
264        let mut lifecycle_rx = self.lifecycle_tx.subscribe();
265        fanout_snapshot_for_query(
266            self.query_id,
267            region_query_handler,
268            entry,
269            filter,
270            is_complete,
271            &mut lifecycle_rx,
272            REMOTE_DYN_FILTER_CONTROL_RPC_TIMEOUT,
273        )
274        .await;
275    }
276
277    #[cfg(test)]
278    async fn unregister_all_once(&self, region_query_handler: &RegionQueryHandlerRef) {
279        for entry in self.entries() {
280            unregister_entry_once_for_query(region_query_handler, self.query_id, &entry).await;
281        }
282    }
283}
284
285fn ensure_entry_fanout_task(
286    query_id: QueryId,
287    entry: Arc<DynFilterEntry>,
288    region_query_handler: RegionQueryHandlerRef,
289    lifecycle_rx: watch::Receiver<()>,
290) {
291    if !entry.mark_fanout_started() {
292        return;
293    }
294
295    let _handle = spawn_global(async move {
296        run_entry_fanout(query_id, entry, region_query_handler, lifecycle_rx).await;
297    });
298}
299
300async fn run_entry_fanout(
301    query_id: QueryId,
302    entry: Arc<DynFilterEntry>,
303    region_query_handler: RegionQueryHandlerRef,
304    mut lifecycle_rx: watch::Receiver<()>,
305) {
306    let mut is_complete = false;
307    // Start reconcile after one interval and skip missed ticks; it is only a coalescing fallback.
308    let mut reconcile_interval = tokio::time::interval_at(
309        tokio::time::Instant::now() + REMOTE_DYN_FILTER_RECONCILE_INTERVAL,
310        REMOTE_DYN_FILTER_RECONCILE_INTERVAL,
311    );
312    reconcile_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
313
314    loop {
315        let Some(filter) = entry.upgrade_producer_filter() else {
316            unregister_entry_once_for_query(&region_query_handler, query_id, &entry).await;
317            return;
318        };
319
320        if !fanout_snapshot_for_query(
321            query_id,
322            &region_query_handler,
323            &entry,
324            &filter,
325            is_complete,
326            &mut lifecycle_rx,
327            REMOTE_DYN_FILTER_CONTROL_RPC_TIMEOUT,
328        )
329        .await
330        {
331            break;
332        }
333
334        if is_complete {
335            tokio::select! {
336                _ = entry.subscriber_changed.notified() => {}
337                result = lifecycle_rx.changed() => {
338                    if result.is_err() {
339                        break;
340                    }
341                }
342            }
343            continue;
344        }
345
346        tokio::select! {
347            _ = filter.wait_update() => {}
348            _ = filter.wait_complete() => {
349                is_complete = true;
350            }
351            // `wait_update()` can miss an update sent while an RPC is in-flight.
352            // Re-read periodically to coalesce to the latest generation.
353            _ = reconcile_interval.tick() => {}
354            _ = entry.subscriber_changed.notified() => {}
355            result = lifecycle_rx.changed() => {
356                if result.is_err() {
357                    break;
358                }
359            }
360        }
361    }
362
363    unregister_entry_once_for_query(&region_query_handler, query_id, &entry).await;
364}
365
366async fn fanout_snapshot_for_query(
367    query_id: QueryId,
368    region_query_handler: &RegionQueryHandlerRef,
369    entry: &DynFilterEntry,
370    filter: &DynamicFilterPhysicalExpr,
371    is_complete: bool,
372    lifecycle_rx: &mut watch::Receiver<()>,
373    control_rpc_timeout: Duration,
374) -> bool {
375    let Some((generation, current)) = current_stable_snapshot(filter, lifecycle_rx).await else {
376        return true;
377    };
378
379    // The entry-global watermark advances before best-effort fanout. A timed-out
380    // subscriber may miss this generation; later/complete snapshots supersede it,
381    // and RDF only prunes.
382    if !is_complete && !entry.mark_generation_sent(generation) {
383        return true;
384    }
385
386    if is_complete {
387        let _ = entry.mark_generation_sent(generation);
388    }
389
390    let payload = match DynFilterPayload::from_datafusion_expr(
391        &current,
392        REMOTE_DYN_FILTER_UPDATE_PAYLOAD_MAX_BYTES,
393    ) {
394        Ok(DynFilterPayload::Datafusion(payload)) => payload,
395        Ok(_) => {
396            warn!("Ignored unsupported remote dynamic filter producer payload");
397            return true;
398        }
399        Err(error) => {
400            warn!(error; "Failed to encode remote dynamic filter producer snapshot");
401            return true;
402        }
403    };
404
405    fanout_update_for_query(
406        query_id,
407        region_query_handler,
408        entry,
409        generation,
410        is_complete,
411        payload,
412        lifecycle_rx,
413        control_rpc_timeout,
414    )
415    .await
416}
417
418#[allow(clippy::too_many_arguments)]
419async fn fanout_update_for_query(
420    query_id: QueryId,
421    region_query_handler: &RegionQueryHandlerRef,
422    entry: &DynFilterEntry,
423    generation: u64,
424    is_complete: bool,
425    payload: Vec<u8>,
426    lifecycle_rx: &mut watch::Receiver<()>,
427    control_rpc_timeout: Duration,
428) -> bool {
429    let query_id = query_id.to_string();
430    let filter_id = entry.filter_id().to_string();
431
432    for subscriber in entry.subscribers() {
433        let update = RemoteDynFilterUpdate {
434            filter_id: filter_id.clone(),
435            payload: payload.clone(),
436            generation,
437            is_complete,
438        };
439
440        match await_control_rpc_or_lifecycle_close(
441            lifecycle_rx,
442            format!(
443                "update query_id={} filter_id={} region_id={}",
444                query_id,
445                filter_id,
446                subscriber.region_id()
447            ),
448            region_query_handler.handle_remote_dyn_filter_update(
449                subscriber.region_id(),
450                query_id.clone(),
451                update,
452            ),
453            control_rpc_timeout,
454        )
455        .await
456        {
457            ControlRpcResult::Ok(result) => {
458                if let Err(error) = result {
459                    warn!(
460                        error;
461                        "Failed to fan out remote dynamic filter update, query_id={}, filter_id={}, region_id={}",
462                        query_id,
463                        filter_id,
464                        subscriber.region_id()
465                    );
466                }
467            }
468            ControlRpcResult::TimedOut => {}
469            ControlRpcResult::LifecycleClosed => return false,
470        }
471    }
472
473    true
474}
475
476async fn unregister_entry_once_for_query(
477    region_query_handler: &RegionQueryHandlerRef,
478    query_id: QueryId,
479    entry: &DynFilterEntry,
480) {
481    if !entry.try_mark_unregistered() {
482        return;
483    }
484
485    let query_id = query_id.to_string();
486    let filter_id = entry.filter_id().to_string();
487
488    for subscriber in entry.subscribers() {
489        let unregister = RemoteDynFilterUnregister {
490            filter_id: filter_id.clone(),
491        };
492
493        let Some(result) = await_control_rpc_timeout(
494            format!(
495                "unregister query_id={} filter_id={} region_id={}",
496                query_id,
497                filter_id,
498                subscriber.region_id()
499            ),
500            region_query_handler.handle_remote_dyn_filter_unregister(
501                subscriber.region_id(),
502                query_id.clone(),
503                unregister,
504            ),
505        )
506        .await
507        else {
508            continue;
509        };
510
511        if let Err(error) = result {
512            warn!(
513                error;
514                "Failed to fan out remote dynamic filter unregister, query_id={}, filter_id={}, region_id={}",
515                query_id,
516                filter_id,
517                subscriber.region_id()
518            );
519        }
520    }
521
522    debug!("Remote dynamic filter producer unregistered subscribers");
523}
524
525enum ControlRpcResult<T> {
526    Ok(T),
527    TimedOut,
528    LifecycleClosed,
529}
530
531async fn await_control_rpc_or_lifecycle_close<T>(
532    lifecycle_rx: &mut watch::Receiver<()>,
533    operation: String,
534    rpc: impl Future<Output = T>,
535    control_rpc_timeout: Duration,
536) -> ControlRpcResult<T> {
537    if lifecycle_rx.has_changed().is_err() {
538        return ControlRpcResult::LifecycleClosed;
539    }
540
541    tokio::select! {
542        biased;
543        result = lifecycle_rx.changed() => {
544            if result.is_err() {
545                debug!("Cancelled remote dynamic filter control RPC after lifecycle close");
546            }
547            ControlRpcResult::LifecycleClosed
548        }
549        result = rpc => ControlRpcResult::Ok(result),
550        _ = tokio::time::sleep(control_rpc_timeout) => {
551            warn!("Timed out remote dynamic filter control RPC: {}", operation);
552            ControlRpcResult::TimedOut
553        }
554    }
555}
556
557async fn await_control_rpc_timeout<T>(
558    operation: String,
559    rpc: impl Future<Output = T>,
560) -> Option<T> {
561    tokio::select! {
562        result = rpc => Some(result),
563        _ = tokio::time::sleep(REMOTE_DYN_FILTER_CONTROL_RPC_TIMEOUT) => {
564            warn!("Timed out remote dynamic filter control RPC: {}", operation);
565            None
566        }
567    }
568}
569
570async fn current_stable_snapshot(
571    filter: &DynamicFilterPhysicalExpr,
572    lifecycle_rx: &mut watch::Receiver<()>,
573) -> Option<(u64, Arc<dyn PhysicalExpr>)> {
574    loop {
575        if lifecycle_rx.has_changed().is_err() {
576            return None;
577        }
578
579        let before = filter.snapshot_generation();
580        let current = match filter.current() {
581            Ok(current) => current,
582            Err(error) => {
583                warn!(error; "Failed to read remote dynamic filter producer snapshot");
584                return None;
585            }
586        };
587        let after = filter.snapshot_generation();
588
589        if before == after {
590            return Some((after, current));
591        }
592
593        tokio::select! {
594            biased;
595            result = lifecycle_rx.changed() => {
596                if result.is_err() {
597                    return None;
598                }
599            }
600            _ = tokio::task::yield_now() => {}
601        }
602    }
603}
604
605/// Stream-scoped lease that keeps a query registry alive.
606///
607/// Stream leases own registry lifecycle; the manager only keeps a weak index.
608#[derive(Debug)]
609pub struct RemoteDynFilterRegistryLease {
610    registry_manager: Arc<DynFilterRegistryManager>,
611    /// Always `Some` while the lease is alive.
612    ///
613    /// `Option` lets `Drop` release the strong `Arc` before pruning the weak index.
614    registry: Option<Arc<QueryDynFilterRegistry>>,
615}
616
617impl RemoteDynFilterRegistryLease {
618    fn new(
619        registry_manager: Arc<DynFilterRegistryManager>,
620        registry: Arc<QueryDynFilterRegistry>,
621    ) -> Self {
622        Self {
623            registry_manager,
624            registry: Some(registry),
625        }
626    }
627
628    pub fn registry(&self) -> &QueryDynFilterRegistry {
629        self.registry
630            .as_deref()
631            .expect("remote dyn filter registry lease must hold a registry")
632    }
633
634    pub fn ensure_fanout_task(&self, region_query_handler: RegionQueryHandlerRef) {
635        self.registry
636            .as_ref()
637            .expect("remote dyn filter registry lease must hold a registry")
638            .ensure_fanout_task(region_query_handler);
639    }
640
641    #[cfg(test)]
642    pub(crate) fn ptr_eq(&self, other: &Self) -> bool {
643        Arc::ptr_eq(
644            self.registry.as_ref().unwrap(),
645            other.registry.as_ref().unwrap(),
646        )
647    }
648}
649
650impl Drop for RemoteDynFilterRegistryLease {
651    fn drop(&mut self) {
652        let Some(registry) = self.registry.take() else {
653            return;
654        };
655        let query_id = registry.query_id();
656        let registry_weak = Arc::downgrade(&registry);
657
658        // Release this lease before pruning; concurrent drops must not observe each other's strong refs.
659        drop(registry);
660
661        let _ = self
662            .registry_manager
663            .remove_if_dropped_registry(&query_id, &registry_weak);
664    }
665}
666
667/// Query-engine manager for query-scoped remote dynamic filter registries.
668///
669/// Weak index only; stream leases own registries through [`RemoteDynFilterRegistryLease`].
670#[derive(Debug, Default)]
671pub struct DynFilterRegistryManager {
672    registries: RwLock<HashMap<QueryId, Weak<QueryDynFilterRegistry>>>,
673}
674
675impl DynFilterRegistryManager {
676    #[cfg(test)]
677    fn get(&self, query_id: &QueryId) -> Option<Arc<QueryDynFilterRegistry>> {
678        let (registry, stale_entry) = {
679            let registries = self.registries.read().unwrap();
680            let registry = registries.get(query_id)?;
681
682            (registry.upgrade(), registry.clone())
683        };
684
685        if registry.is_none() {
686            self.remove_stale_entry(query_id, &stale_entry);
687        }
688
689        registry
690    }
691
692    #[cfg(test)]
693    fn remove(&self, query_id: &QueryId) -> Option<Weak<QueryDynFilterRegistry>> {
694        self.registries.write().unwrap().remove(query_id)
695    }
696
697    fn remove_if_dropped_registry(
698        &self,
699        query_id: &QueryId,
700        dropped_registry: &Weak<QueryDynFilterRegistry>,
701    ) -> Option<Weak<QueryDynFilterRegistry>> {
702        let mut registries = self
703            .registries
704            .write()
705            .unwrap_or_else(|poisoned| poisoned.into_inner());
706        let current = registries.get(query_id)?;
707
708        // `ptr_eq` protects a newer registry for the same query id; `upgrade` ensures it is dead.
709        if current.ptr_eq(dropped_registry) && current.upgrade().is_none() {
710            registries.remove(query_id)
711        } else {
712            None
713        }
714    }
715
716    #[cfg(test)]
717    fn remove_stale_entry(
718        &self,
719        query_id: &QueryId,
720        stale_registry: &Weak<QueryDynFilterRegistry>,
721    ) {
722        let mut registries = self.registries.write().unwrap();
723        let Some(current) = registries.get(query_id) else {
724            return;
725        };
726
727        if current.ptr_eq(stale_registry) && current.upgrade().is_none() {
728            registries.remove(query_id);
729        }
730    }
731
732    /// Acquires the stream-owned registry lease for `query_id`.
733    ///
734    /// Returns a lease holding a strong registry reference.
735    pub fn acquire_lease(self: &Arc<Self>, query_id: QueryId) -> RemoteDynFilterRegistryLease {
736        let registry = self.get_or_init(query_id);
737        RemoteDynFilterRegistryLease::new(self.clone(), registry)
738    }
739
740    fn get_or_init(&self, query_id: QueryId) -> Arc<QueryDynFilterRegistry> {
741        let mut registries = self.registries.write().unwrap();
742
743        if let Some(registry) = registries.get(&query_id).and_then(Weak::upgrade) {
744            return registry;
745        }
746
747        let registry = Arc::new(QueryDynFilterRegistry::new(query_id));
748        registries.insert(query_id, Arc::downgrade(&registry));
749        registry
750    }
751
752    #[cfg(test)]
753    pub fn registry_count(&self) -> usize {
754        // Test snapshot helper; lifecycle decisions use lease-owned Arcs and weak pruning.
755        self.registries
756            .read()
757            .unwrap()
758            .values()
759            .filter(|registry| registry.strong_count() > 0)
760            .count()
761    }
762
763    #[cfg(test)]
764    fn weak_entry_count(&self) -> usize {
765        self.registries.read().unwrap().len()
766    }
767}
768
769#[cfg(test)]
770mod tests {
771    use std::sync::atomic::{AtomicBool, Ordering};
772    use std::sync::{Barrier, Mutex};
773    use std::thread;
774    use std::time::Duration;
775
776    use api::v1::region::{RemoteDynFilterUnregister, RemoteDynFilterUpdate};
777    use async_trait::async_trait;
778    use common_query::request::QueryRequest;
779    use datafusion_physical_expr::expressions::{Column, lit};
780    use session::ReadPreference;
781    use uuid::Uuid;
782
783    use super::*;
784    use crate::dist_plan::{FilterFingerprint, RemoteDynFilterProducerId};
785    use crate::error::Result as QueryResult;
786    use crate::region_query::RegionQueryHandler;
787
788    #[derive(Debug, Clone, PartialEq, Eq)]
789    struct RecordedUpdate {
790        region_id: RegionId,
791        query_id: String,
792        filter_id: String,
793        generation: u64,
794        is_complete: bool,
795        payload: Vec<u8>,
796    }
797
798    #[derive(Debug, Clone, PartialEq, Eq)]
799    struct RecordedUnregister {
800        region_id: RegionId,
801        query_id: String,
802        filter_id: String,
803    }
804
805    #[derive(Default)]
806    struct RecordingRegionQueryHandler {
807        updates: Mutex<Vec<RecordedUpdate>>,
808        unregisters: Mutex<Vec<RecordedUnregister>>,
809        block_next_update: AtomicBool,
810        update_blocked: Notify,
811        release_update: Notify,
812    }
813
814    impl RecordingRegionQueryHandler {
815        fn updates(&self) -> Vec<RecordedUpdate> {
816            self.updates.lock().unwrap().clone()
817        }
818
819        fn unregisters(&self) -> Vec<RecordedUnregister> {
820            self.unregisters.lock().unwrap().clone()
821        }
822
823        fn block_next_update(&self) {
824            self.block_next_update.store(true, Ordering::SeqCst);
825        }
826
827        async fn wait_for_blocked_update(&self) {
828            self.update_blocked.notified().await;
829        }
830
831        fn release_blocked_update(&self) {
832            self.release_update.notify_one();
833        }
834
835        async fn wait_for_update_count(&self, expected: usize) {
836            for _ in 0..300 {
837                if self.updates().len() >= expected {
838                    return;
839                }
840                tokio::time::sleep(Duration::from_millis(10)).await;
841            }
842            panic!("timed out waiting for {expected} remote dyn filter updates");
843        }
844
845        async fn wait_for_unregister_count(&self, expected: usize) {
846            for _ in 0..300 {
847                if self.unregisters().len() >= expected {
848                    return;
849                }
850                tokio::time::sleep(Duration::from_millis(10)).await;
851            }
852            panic!("timed out waiting for {expected} remote dyn filter unregisters");
853        }
854    }
855
856    async fn wait_for_registry_drop(registry: Weak<QueryDynFilterRegistry>) {
857        for _ in 0..300 {
858            if registry.upgrade().is_none() {
859                return;
860            }
861            tokio::time::sleep(Duration::from_millis(10)).await;
862        }
863        panic!("timed out waiting for remote dyn filter registry drop");
864    }
865
866    #[async_trait]
867    impl RegionQueryHandler for RecordingRegionQueryHandler {
868        async fn do_get(
869            &self,
870            _read_preference: ReadPreference,
871            _request: QueryRequest,
872        ) -> QueryResult<common_recordbatch::SendableRecordBatchStream> {
873            unreachable!("remote dyn filter registry tests should not execute remote queries")
874        }
875
876        async fn handle_remote_dyn_filter_update(
877            &self,
878            region_id: RegionId,
879            query_id: String,
880            update: RemoteDynFilterUpdate,
881        ) -> QueryResult<()> {
882            let should_block = self.block_next_update.swap(false, Ordering::SeqCst);
883            self.updates.lock().unwrap().push(RecordedUpdate {
884                region_id,
885                query_id,
886                filter_id: update.filter_id,
887                generation: update.generation,
888                is_complete: update.is_complete,
889                payload: update.payload,
890            });
891            if should_block {
892                self.update_blocked.notify_one();
893                self.release_update.notified().await;
894            }
895            Ok(())
896        }
897
898        async fn handle_remote_dyn_filter_unregister(
899            &self,
900            region_id: RegionId,
901            query_id: String,
902            unregister: RemoteDynFilterUnregister,
903        ) -> QueryResult<()> {
904            self.unregisters.lock().unwrap().push(RecordedUnregister {
905                region_id,
906                query_id,
907                filter_id: unregister.filter_id,
908            });
909            Ok(())
910        }
911    }
912
913    fn test_query_id(value: u128) -> QueryId {
914        QueryId::from(Uuid::from_u128(value))
915    }
916
917    fn test_filter_id(producer_ordinal: u32) -> FilterId {
918        FilterId::new(
919            RemoteDynFilterProducerId::new(42),
920            producer_ordinal,
921            FilterFingerprint::new(0xabc),
922        )
923    }
924
925    fn test_dyn_filter(names: &[&str]) -> Arc<DynamicFilterPhysicalExpr> {
926        let children = names
927            .iter()
928            .enumerate()
929            .map(|(index, name)| Arc::new(Column::new(name, index)) as _)
930            .collect();
931
932        Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true) as _))
933    }
934
935    #[test]
936    fn registry_manager_returns_same_registry_for_same_query() {
937        let manager = Arc::new(DynFilterRegistryManager::default());
938        let query_id = test_query_id(1);
939        let first = manager.acquire_lease(query_id);
940        let second = manager.acquire_lease(query_id);
941
942        assert!(first.ptr_eq(&second));
943        assert_eq!(manager.registry_count(), 1);
944        assert_eq!(manager.weak_entry_count(), 1);
945    }
946
947    #[test]
948    fn registry_manager_removes_registry_for_query() {
949        let manager = Arc::new(DynFilterRegistryManager::default());
950        let query_id = test_query_id(1);
951
952        let lease = manager.acquire_lease(query_id);
953
954        assert!(
955            manager
956                .remove(&query_id)
957                .unwrap()
958                .ptr_eq(&Arc::downgrade(lease.registry.as_ref().unwrap()))
959        );
960        assert!(manager.get(&query_id).is_none());
961        assert_eq!(manager.registry_count(), 0);
962        assert_eq!(manager.weak_entry_count(), 0);
963    }
964
965    #[test]
966    fn registry_manager_lease_waits_for_last_query_scoped_stream() {
967        let manager = Arc::new(DynFilterRegistryManager::default());
968        let query_id = test_query_id(1);
969
970        let first = manager.acquire_lease(query_id);
971        let second = manager.acquire_lease(query_id);
972
973        assert!(first.ptr_eq(&second));
974        assert_eq!(manager.registry_count(), 1);
975        assert_eq!(manager.weak_entry_count(), 1);
976        drop(first);
977        assert_eq!(manager.registry_count(), 1);
978        assert_eq!(manager.weak_entry_count(), 1);
979
980        drop(second);
981        assert_eq!(manager.registry_count(), 0);
982        assert_eq!(manager.weak_entry_count(), 0);
983    }
984
985    #[test]
986    fn registry_manager_lease_does_not_remove_reacquired_registry() {
987        let manager = Arc::new(DynFilterRegistryManager::default());
988        let query_id = test_query_id(1);
989
990        let first = manager.acquire_lease(query_id);
991        drop(first);
992        assert_eq!(manager.registry_count(), 0);
993        assert_eq!(manager.weak_entry_count(), 0);
994
995        let second = manager.acquire_lease(query_id);
996
997        assert_eq!(manager.registry_count(), 1);
998        assert_eq!(manager.weak_entry_count(), 1);
999        drop(second);
1000        assert_eq!(manager.registry_count(), 0);
1001        assert_eq!(manager.weak_entry_count(), 0);
1002    }
1003
1004    #[test]
1005    fn registry_manager_concurrent_final_lease_drop_cleans_weak_entry() {
1006        let manager = Arc::new(DynFilterRegistryManager::default());
1007        let query_id = test_query_id(1);
1008        let first = manager.acquire_lease(query_id);
1009        let second = manager.acquire_lease(query_id);
1010        let barrier = Arc::new(Barrier::new(3));
1011
1012        let first_barrier = barrier.clone();
1013        let first_drop = thread::spawn(move || {
1014            first_barrier.wait();
1015            drop(first);
1016        });
1017
1018        let second_barrier = barrier.clone();
1019        let second_drop = thread::spawn(move || {
1020            second_barrier.wait();
1021            drop(second);
1022        });
1023
1024        barrier.wait();
1025        first_drop.join().unwrap();
1026        second_drop.join().unwrap();
1027
1028        assert_eq!(manager.registry_count(), 0);
1029        assert_eq!(manager.weak_entry_count(), 0);
1030    }
1031
1032    #[test]
1033    fn registry_manager_concurrent_first_acquire_shares_registry() {
1034        let manager = Arc::new(DynFilterRegistryManager::default());
1035        let query_id = test_query_id(1);
1036        let worker_count = 8;
1037        let barrier = Arc::new(Barrier::new(worker_count + 1));
1038
1039        let handles = (0..worker_count)
1040            .map(|_| {
1041                let manager = manager.clone();
1042                let barrier = barrier.clone();
1043                thread::spawn(move || {
1044                    barrier.wait();
1045                    manager.acquire_lease(query_id)
1046                })
1047            })
1048            .collect::<Vec<_>>();
1049
1050        barrier.wait();
1051        let leases = handles
1052            .into_iter()
1053            .map(|handle| handle.join().unwrap())
1054            .collect::<Vec<_>>();
1055
1056        let first = leases.first().unwrap();
1057        assert!(leases.iter().all(|lease| first.ptr_eq(lease)));
1058        assert_eq!(manager.registry_count(), 1);
1059        assert_eq!(manager.weak_entry_count(), 1);
1060
1061        drop(leases);
1062        assert_eq!(manager.registry_count(), 0);
1063        assert_eq!(manager.weak_entry_count(), 0);
1064    }
1065
1066    #[test]
1067    fn registry_manager_drop_racing_acquire_does_not_leave_stale_entry() {
1068        let manager = Arc::new(DynFilterRegistryManager::default());
1069        let query_id = test_query_id(1);
1070
1071        for _ in 0..64 {
1072            let old_lease = manager.acquire_lease(query_id);
1073            let barrier = Arc::new(Barrier::new(3));
1074
1075            let drop_barrier = barrier.clone();
1076            let drop_thread = thread::spawn(move || {
1077                drop_barrier.wait();
1078                drop(old_lease);
1079            });
1080
1081            let acquire_manager = manager.clone();
1082            let acquire_barrier = barrier.clone();
1083            let acquire_thread = thread::spawn(move || {
1084                acquire_barrier.wait();
1085                acquire_manager.acquire_lease(query_id)
1086            });
1087
1088            barrier.wait();
1089            drop_thread.join().unwrap();
1090            let new_lease = acquire_thread.join().unwrap();
1091
1092            assert_eq!(manager.registry_count(), 1);
1093            assert_eq!(manager.weak_entry_count(), 1);
1094            drop(new_lease);
1095            assert_eq!(manager.registry_count(), 0);
1096            assert_eq!(manager.weak_entry_count(), 0);
1097        }
1098    }
1099
1100    #[test]
1101    fn registry_manager_old_drop_cannot_remove_replacement_registry() {
1102        let manager = Arc::new(DynFilterRegistryManager::default());
1103        let query_id = test_query_id(1);
1104        let old_lease = manager.acquire_lease(query_id);
1105        let old_registry = Arc::downgrade(old_lease.registry.as_ref().unwrap());
1106
1107        drop(old_lease);
1108        assert_eq!(manager.registry_count(), 0);
1109        assert_eq!(manager.weak_entry_count(), 0);
1110
1111        let replacement_lease = manager.acquire_lease(query_id);
1112        assert_eq!(manager.registry_count(), 1);
1113        assert_eq!(manager.weak_entry_count(), 1);
1114
1115        assert!(
1116            manager
1117                .remove_if_dropped_registry(&query_id, &old_registry)
1118                .is_none(),
1119            "old registry cleanup must not remove the replacement weak entry"
1120        );
1121        assert_eq!(manager.registry_count(), 1);
1122        assert_eq!(manager.weak_entry_count(), 1);
1123
1124        drop(replacement_lease);
1125        assert_eq!(manager.registry_count(), 0);
1126        assert_eq!(manager.weak_entry_count(), 0);
1127    }
1128
1129    #[test]
1130    fn registry_stores_filter_and_deduplicates_subscribers() {
1131        let registry = QueryDynFilterRegistry::new(test_query_id(1));
1132        let filter = test_dyn_filter(&["host"]);
1133        let filter_id = test_filter_id(1);
1134        let entry = match registry.register_remote_dyn_filter(filter_id.clone(), filter.clone()) {
1135            EntryRegistration::Inserted(entry) => entry,
1136            other => panic!("unexpected registration result: {other:?}"),
1137        };
1138
1139        assert_eq!(entry.filter_id(), &filter_id);
1140        assert_eq!(registry.entry_count(), 1);
1141
1142        let subscriber = Subscriber::new(RegionId::new(1024, 1));
1143        assert_eq!(
1144            registry.register_subscriber(&filter_id, subscriber.clone()),
1145            SubscriberRegistration::Added
1146        );
1147        assert_eq!(
1148            registry.register_subscriber(&filter_id, subscriber),
1149            SubscriberRegistration::Duplicate
1150        );
1151        assert_eq!(entry.subscribers().len(), 1);
1152    }
1153
1154    #[tokio::test]
1155    async fn fanout_sends_changed_generations_to_subscribers() {
1156        let query_id = test_query_id(1);
1157        let registry = Arc::new(QueryDynFilterRegistry::new(query_id));
1158        let filter = test_dyn_filter(&["host"]);
1159        let filter_id = test_filter_id(1);
1160        let entry = match registry.register_remote_dyn_filter(filter_id.clone(), filter.clone()) {
1161            EntryRegistration::Inserted(entry) => entry,
1162            other => panic!("unexpected registration result: {other:?}"),
1163        };
1164        let subscriber = Subscriber::new(RegionId::new(1024, 7));
1165        assert_eq!(
1166            registry.register_subscriber(&filter_id, subscriber.clone()),
1167            SubscriberRegistration::Added
1168        );
1169
1170        let handler = Arc::new(RecordingRegionQueryHandler::default());
1171        let handler_ref = handler.clone() as RegionQueryHandlerRef;
1172
1173        registry
1174            .fanout_snapshot(&handler_ref, &entry, filter.as_ref(), false)
1175            .await;
1176        let updates = handler.updates();
1177        assert_eq!(updates.len(), 1);
1178        assert_eq!(updates[0].region_id, subscriber.region_id());
1179        assert_eq!(updates[0].query_id, query_id.to_string());
1180        assert_eq!(updates[0].filter_id, filter_id.to_string());
1181        assert_eq!(updates[0].generation, filter.snapshot_generation());
1182        assert!(!updates[0].is_complete);
1183        assert!(!updates[0].payload.is_empty());
1184
1185        registry
1186            .fanout_snapshot(&handler_ref, &entry, filter.as_ref(), false)
1187            .await;
1188        assert_eq!(handler.updates().len(), 1);
1189
1190        filter.update(lit(false) as _).unwrap();
1191        registry
1192            .fanout_snapshot(&handler_ref, &entry, filter.as_ref(), false)
1193            .await;
1194        let updates = handler.updates();
1195        assert_eq!(updates.len(), 2);
1196        assert_eq!(updates[1].generation, filter.snapshot_generation());
1197
1198        let second_subscriber = Subscriber::new(RegionId::new(1024, 8));
1199        assert_eq!(
1200            registry.register_subscriber(&filter_id, second_subscriber.clone()),
1201            SubscriberRegistration::Added
1202        );
1203        registry
1204            .fanout_snapshot(&handler_ref, &entry, filter.as_ref(), false)
1205            .await;
1206        let updates = handler.updates();
1207        assert_eq!(updates.len(), 4);
1208        assert!(
1209            updates[2..]
1210                .iter()
1211                .any(|update| update.region_id == subscriber.region_id())
1212        );
1213        assert!(
1214            updates[2..]
1215                .iter()
1216                .any(|update| update.region_id == second_subscriber.region_id())
1217        );
1218        assert_eq!(entry.subscribers().len(), 2);
1219    }
1220
1221    #[tokio::test]
1222    async fn fanout_task_waits_for_dynamic_filter_notifications() {
1223        let query_id = test_query_id(3);
1224        let manager = Arc::new(DynFilterRegistryManager::default());
1225        let lease = manager.acquire_lease(query_id);
1226        let registry_weak = Arc::downgrade(lease.registry.as_ref().unwrap());
1227        let filter = test_dyn_filter(&["host"]);
1228        let filter_id = test_filter_id(1);
1229        let _ = lease
1230            .registry()
1231            .register_remote_dyn_filter(filter_id.clone(), filter.clone());
1232        let subscriber = Subscriber::new(RegionId::new(1024, 7));
1233        assert_eq!(
1234            lease
1235                .registry()
1236                .register_subscriber(&filter_id, subscriber.clone()),
1237            SubscriberRegistration::Added
1238        );
1239
1240        let handler = Arc::new(RecordingRegionQueryHandler::default());
1241        lease.ensure_fanout_task(handler.clone() as RegionQueryHandlerRef);
1242
1243        handler.wait_for_update_count(1).await;
1244        let initial_generation = handler.updates()[0].generation;
1245
1246        filter.update(lit(false) as _).unwrap();
1247        handler.wait_for_update_count(2).await;
1248        let updates = handler.updates();
1249        assert!(updates[1].generation > initial_generation);
1250        assert_eq!(updates[1].region_id, subscriber.region_id());
1251        assert_eq!(updates[1].filter_id, filter_id.to_string());
1252
1253        filter.mark_complete();
1254        handler.wait_for_update_count(3).await;
1255        let updates = handler.updates();
1256        assert!(updates[2].is_complete);
1257
1258        drop(lease);
1259        handler.wait_for_unregister_count(1).await;
1260        let unregisters = handler.unregisters();
1261        assert_eq!(unregisters[0].region_id, subscriber.region_id());
1262        assert_eq!(unregisters[0].filter_id, filter_id.to_string());
1263
1264        wait_for_registry_drop(registry_weak).await;
1265    }
1266
1267    #[tokio::test]
1268    async fn repeated_ensure_fanout_task_keeps_single_watcher() {
1269        let query_id = test_query_id(6);
1270        let manager = Arc::new(DynFilterRegistryManager::default());
1271        let lease = manager.acquire_lease(query_id);
1272        let registry_weak = Arc::downgrade(lease.registry.as_ref().unwrap());
1273        let filter = test_dyn_filter(&["host"]);
1274        let filter_id = test_filter_id(1);
1275        let entry = match lease
1276            .registry()
1277            .register_remote_dyn_filter(filter_id.clone(), filter.clone())
1278        {
1279            EntryRegistration::Inserted(entry) => entry,
1280            other => panic!("unexpected registration result: {other:?}"),
1281        };
1282        let subscriber = Subscriber::new(RegionId::new(1024, 7));
1283        assert_eq!(
1284            lease
1285                .registry()
1286                .register_subscriber(&filter_id, subscriber.clone()),
1287            SubscriberRegistration::Added
1288        );
1289
1290        let handler = Arc::new(RecordingRegionQueryHandler::default());
1291        lease.ensure_fanout_task(handler.clone() as RegionQueryHandlerRef);
1292        lease.ensure_fanout_task(handler.clone() as RegionQueryHandlerRef);
1293
1294        assert!(entry.fanout_started_for_test());
1295        handler.wait_for_update_count(1).await;
1296        tokio::time::sleep(Duration::from_millis(50)).await;
1297        assert_eq!(handler.updates().len(), 1);
1298
1299        filter.update(lit(false) as _).unwrap();
1300        handler.wait_for_update_count(2).await;
1301        tokio::time::sleep(Duration::from_millis(50)).await;
1302        assert_eq!(handler.updates().len(), 2);
1303
1304        drop(lease);
1305        handler.wait_for_unregister_count(1).await;
1306        wait_for_registry_drop(registry_weak).await;
1307    }
1308
1309    #[tokio::test]
1310    async fn fanout_task_resends_complete_snapshot_to_late_subscriber() {
1311        let query_id = test_query_id(7);
1312        let manager = Arc::new(DynFilterRegistryManager::default());
1313        let lease = manager.acquire_lease(query_id);
1314        let registry_weak = Arc::downgrade(lease.registry.as_ref().unwrap());
1315        let filter = test_dyn_filter(&["host"]);
1316        let filter_id = test_filter_id(1);
1317        let _ = lease
1318            .registry()
1319            .register_remote_dyn_filter(filter_id.clone(), filter.clone());
1320        let first_subscriber = Subscriber::new(RegionId::new(1024, 7));
1321        assert_eq!(
1322            lease
1323                .registry()
1324                .register_subscriber(&filter_id, first_subscriber.clone()),
1325            SubscriberRegistration::Added
1326        );
1327
1328        let handler = Arc::new(RecordingRegionQueryHandler::default());
1329        lease.ensure_fanout_task(handler.clone() as RegionQueryHandlerRef);
1330        handler.wait_for_update_count(1).await;
1331
1332        filter.mark_complete();
1333        handler.wait_for_update_count(2).await;
1334        assert!(handler.updates()[1].is_complete);
1335
1336        let late_subscriber = Subscriber::new(RegionId::new(1024, 8));
1337        assert_eq!(
1338            lease
1339                .registry()
1340                .register_subscriber(&filter_id, late_subscriber.clone()),
1341            SubscriberRegistration::Added
1342        );
1343
1344        handler.wait_for_update_count(4).await;
1345        let updates = handler.updates();
1346        assert!(
1347            updates[2..].iter().any(
1348                |update| update.region_id == first_subscriber.region_id() && update.is_complete
1349            )
1350        );
1351        assert!(
1352            updates[2..]
1353                .iter()
1354                .any(|update| update.region_id == late_subscriber.region_id()
1355                    && update.is_complete)
1356        );
1357
1358        drop(lease);
1359        handler.wait_for_unregister_count(1).await;
1360        wait_for_registry_drop(registry_weak).await;
1361    }
1362
1363    #[tokio::test]
1364    async fn fanout_task_unregisters_when_producer_filter_is_dropped() {
1365        let query_id = test_query_id(8);
1366        let manager = Arc::new(DynFilterRegistryManager::default());
1367        let lease = manager.acquire_lease(query_id);
1368        let registry_weak = Arc::downgrade(lease.registry.as_ref().unwrap());
1369        let filter = test_dyn_filter(&["host"]);
1370        let filter_id = test_filter_id(1);
1371        let _ = lease
1372            .registry()
1373            .register_remote_dyn_filter(filter_id.clone(), filter.clone());
1374        let subscriber = Subscriber::new(RegionId::new(1024, 7));
1375        assert_eq!(
1376            lease
1377                .registry()
1378                .register_subscriber(&filter_id, subscriber.clone()),
1379            SubscriberRegistration::Added
1380        );
1381
1382        let handler = Arc::new(RecordingRegionQueryHandler::default());
1383        lease.ensure_fanout_task(handler.clone() as RegionQueryHandlerRef);
1384        handler.wait_for_update_count(1).await;
1385
1386        drop(filter);
1387        handler.wait_for_unregister_count(1).await;
1388        let unregisters = handler.unregisters();
1389        assert_eq!(unregisters[0].region_id, subscriber.region_id());
1390        assert_eq!(unregisters[0].filter_id, filter_id.to_string());
1391
1392        drop(lease);
1393        wait_for_registry_drop(registry_weak).await;
1394    }
1395
1396    #[tokio::test]
1397    async fn reconcile_tick_catches_update_while_fanout_is_in_flight() {
1398        let query_id = test_query_id(4);
1399        let manager = Arc::new(DynFilterRegistryManager::default());
1400        let lease = manager.acquire_lease(query_id);
1401        let registry_weak = Arc::downgrade(lease.registry.as_ref().unwrap());
1402        let filter = test_dyn_filter(&["host"]);
1403        let filter_id = test_filter_id(1);
1404        let _ = lease
1405            .registry()
1406            .register_remote_dyn_filter(filter_id.clone(), filter.clone());
1407        let subscriber = Subscriber::new(RegionId::new(1024, 7));
1408        assert_eq!(
1409            lease
1410                .registry()
1411                .register_subscriber(&filter_id, subscriber.clone()),
1412            SubscriberRegistration::Added
1413        );
1414
1415        let handler = Arc::new(RecordingRegionQueryHandler::default());
1416        handler.block_next_update();
1417        lease.ensure_fanout_task(handler.clone() as RegionQueryHandlerRef);
1418
1419        handler.wait_for_blocked_update().await;
1420        let initial_generation = handler.updates()[0].generation;
1421
1422        // Update before the watcher can subscribe again; reconcile must catch it.
1423        filter.update(lit(false) as _).unwrap();
1424        handler.release_blocked_update();
1425
1426        handler.wait_for_update_count(2).await;
1427        let updates = handler.updates();
1428        assert!(updates[1].generation > initial_generation);
1429        assert_eq!(updates[1].region_id, subscriber.region_id());
1430        assert_eq!(updates[1].filter_id, filter_id.to_string());
1431
1432        drop(lease);
1433        handler.wait_for_unregister_count(1).await;
1434        wait_for_registry_drop(registry_weak).await;
1435    }
1436
1437    #[tokio::test]
1438    async fn fanout_task_unregisters_after_lifecycle_close_during_blocked_update() {
1439        let query_id = test_query_id(5);
1440        let manager = Arc::new(DynFilterRegistryManager::default());
1441        let lease = manager.acquire_lease(query_id);
1442        let registry_weak = Arc::downgrade(lease.registry.as_ref().unwrap());
1443        let filter = test_dyn_filter(&["host"]);
1444        let filter_id = test_filter_id(1);
1445        let _ = lease
1446            .registry()
1447            .register_remote_dyn_filter(filter_id.clone(), filter.clone());
1448        let subscriber = Subscriber::new(RegionId::new(1024, 7));
1449        assert_eq!(
1450            lease
1451                .registry()
1452                .register_subscriber(&filter_id, subscriber.clone()),
1453            SubscriberRegistration::Added
1454        );
1455
1456        let handler = Arc::new(RecordingRegionQueryHandler::default());
1457        handler.block_next_update();
1458        lease.ensure_fanout_task(handler.clone() as RegionQueryHandlerRef);
1459
1460        handler.wait_for_blocked_update().await;
1461        drop(lease);
1462
1463        handler.wait_for_unregister_count(1).await;
1464        let unregisters = handler.unregisters();
1465        assert_eq!(unregisters[0].region_id, subscriber.region_id());
1466        assert_eq!(unregisters[0].filter_id, filter_id.to_string());
1467        wait_for_registry_drop(registry_weak).await;
1468    }
1469
1470    #[tokio::test]
1471    async fn update_timeout_does_not_stop_fanout_for_other_subscribers() {
1472        let query_id = test_query_id(9);
1473        let registry = QueryDynFilterRegistry::new(query_id);
1474        let filter = test_dyn_filter(&["host"]);
1475        let filter_id = test_filter_id(1);
1476        let entry = match registry.register_remote_dyn_filter(filter_id.clone(), filter.clone()) {
1477            EntryRegistration::Inserted(entry) => entry,
1478            other => panic!("unexpected registration result: {other:?}"),
1479        };
1480        let first_subscriber = Subscriber::new(RegionId::new(1024, 7));
1481        let second_subscriber = Subscriber::new(RegionId::new(1024, 8));
1482        assert_eq!(
1483            registry.register_subscriber(&filter_id, first_subscriber.clone()),
1484            SubscriberRegistration::Added
1485        );
1486        assert_eq!(
1487            registry.register_subscriber(&filter_id, second_subscriber.clone()),
1488            SubscriberRegistration::Added
1489        );
1490
1491        let handler = Arc::new(RecordingRegionQueryHandler::default());
1492        let handler_ref = handler.clone() as RegionQueryHandlerRef;
1493        let mut lifecycle_rx = registry.lifecycle_tx.subscribe();
1494        handler.block_next_update();
1495        assert!(
1496            fanout_snapshot_for_query(
1497                query_id,
1498                &handler_ref,
1499                &entry,
1500                filter.as_ref(),
1501                false,
1502                &mut lifecycle_rx,
1503                Duration::from_millis(100),
1504            )
1505            .await
1506        );
1507
1508        handler.wait_for_blocked_update().await;
1509        // Fanout is serial and the blocked RPC stays blocked; the second update proves
1510        // timeout continued to the next subscriber.
1511        handler.wait_for_update_count(2).await;
1512
1513        let initial_updates = handler.updates();
1514        assert_eq!(
1515            initial_updates.len(),
1516            2,
1517            "the healthy subscriber must still receive the update after another subscriber times out"
1518        );
1519        assert!(
1520            initial_updates
1521                .iter()
1522                .any(|update| update.region_id == first_subscriber.region_id())
1523        );
1524        assert!(
1525            initial_updates
1526                .iter()
1527                .any(|update| update.region_id == second_subscriber.region_id())
1528        );
1529
1530        filter.update(lit(false) as _).unwrap();
1531        assert!(
1532            fanout_snapshot_for_query(
1533                query_id,
1534                &handler_ref,
1535                &entry,
1536                filter.as_ref(),
1537                false,
1538                &mut lifecycle_rx,
1539                Duration::from_millis(100),
1540            )
1541            .await
1542        );
1543        handler.wait_for_update_count(4).await;
1544        let updates = handler.updates();
1545        assert!(
1546            updates[2..]
1547                .iter()
1548                .any(|update| update.region_id == first_subscriber.region_id())
1549        );
1550        assert!(
1551            updates[2..]
1552                .iter()
1553                .any(|update| update.region_id == second_subscriber.region_id())
1554        );
1555
1556        registry.unregister_all_once(&handler_ref).await;
1557        handler.wait_for_unregister_count(1).await;
1558    }
1559
1560    #[tokio::test]
1561    async fn unregister_fanout_is_idempotent() {
1562        let query_id = test_query_id(2);
1563        let registry = QueryDynFilterRegistry::new(query_id);
1564        let filter = test_dyn_filter(&["host"]);
1565        let filter_id = test_filter_id(1);
1566        let _ = registry.register_remote_dyn_filter(filter_id.clone(), filter);
1567        let subscriber = Subscriber::new(RegionId::new(1024, 7));
1568        assert_eq!(
1569            registry.register_subscriber(&filter_id, subscriber.clone()),
1570            SubscriberRegistration::Added
1571        );
1572
1573        let handler = Arc::new(RecordingRegionQueryHandler::default());
1574        let handler_ref = handler.clone() as RegionQueryHandlerRef;
1575
1576        registry.unregister_all_once(&handler_ref).await;
1577        registry.unregister_all_once(&handler_ref).await;
1578
1579        let unregisters = handler.unregisters();
1580        assert_eq!(unregisters.len(), 1);
1581        assert_eq!(unregisters[0].region_id, subscriber.region_id());
1582        assert_eq!(unregisters[0].query_id, query_id.to_string());
1583        assert_eq!(unregisters[0].filter_id, filter_id.to_string());
1584    }
1585}