1use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::sync::{Arc, Mutex, RwLock, Weak};
18use std::time::Duration;
19
20use api::v1::region::{RemoteDynFilterUnregister, RemoteDynFilterUpdate};
21use common_query::request::DynFilterPayload;
22use common_runtime::spawn_global;
23use common_telemetry::{debug, warn};
24use datafusion_physical_expr::PhysicalExpr;
25use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
26use session::query_id::QueryId;
27use store_api::storage::RegionId;
28use tokio::sync::{Notify, watch};
29
30use crate::dist_plan::FilterId;
31use crate::region_query::RegionQueryHandlerRef;
32
33const REMOTE_DYN_FILTER_UPDATE_PAYLOAD_MAX_BYTES: usize = 64 * 1024;
34const REMOTE_DYN_FILTER_RECONCILE_INTERVAL: Duration = Duration::from_secs(1);
35const REMOTE_DYN_FILTER_CONTROL_RPC_TIMEOUT: Duration = Duration::from_secs(10);
37
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40pub struct Subscriber {
41 region_id: RegionId,
42}
43
44impl Subscriber {
45 pub fn new(region_id: RegionId) -> Self {
46 Self { region_id }
47 }
48
49 pub fn region_id(&self) -> RegionId {
50 self.region_id
51 }
52}
53
54#[derive(Debug, Clone)]
56pub enum EntryRegistration {
57 Inserted(Arc<DynFilterEntry>),
58 Existing(Arc<DynFilterEntry>),
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum SubscriberRegistration {
65 Added,
66 Duplicate,
67 MissingFilter,
68}
69
70#[derive(Debug)]
72pub struct DynFilterEntry {
73 filter_id: FilterId,
74 producer_filter: Weak<DynamicFilterPhysicalExpr>,
75 subscribers: RwLock<HashSet<Subscriber>>,
76 state: Mutex<DynFilterEntryState>,
77 subscriber_changed: Notify,
78}
79
80#[derive(Debug, Default)]
81struct DynFilterEntryState {
82 last_sent_generation: u64,
83 unregistered: bool,
84 fanout_started: bool,
85}
86
87#[derive(Debug)]
88struct QueryDynFilterRegistryInner {
89 entries: HashMap<FilterId, Arc<DynFilterEntry>>,
90}
91
92impl DynFilterEntry {
93 pub fn new(filter_id: FilterId, producer_filter: Arc<DynamicFilterPhysicalExpr>) -> Self {
94 Self {
95 filter_id,
96 producer_filter: Arc::downgrade(&producer_filter),
97 subscribers: RwLock::new(HashSet::new()),
98 state: Mutex::new(DynFilterEntryState::default()),
99 subscriber_changed: Notify::new(),
100 }
101 }
102
103 pub fn filter_id(&self) -> &FilterId {
104 &self.filter_id
105 }
106
107 pub fn upgrade_producer_filter(&self) -> Option<Arc<DynamicFilterPhysicalExpr>> {
108 self.producer_filter.upgrade()
109 }
110
111 pub fn subscribers(&self) -> Vec<Subscriber> {
112 self.subscribers.read().unwrap().iter().cloned().collect()
113 }
114
115 pub fn register_subscriber(&self, subscriber: Subscriber) -> bool {
116 let mut subscribers = self.subscribers.write().unwrap();
117 subscribers.insert(subscriber)
118 }
119
120 fn mark_generation_sent(&self, generation: u64) -> bool {
121 let mut state = self.state.lock().unwrap();
122 if generation <= state.last_sent_generation {
123 return false;
124 }
125
126 state.last_sent_generation = generation;
127 true
128 }
129
130 fn try_mark_unregistered(&self) -> bool {
131 let mut state = self.state.lock().unwrap();
132 if state.unregistered {
133 return false;
134 }
135
136 state.unregistered = true;
137 true
138 }
139
140 fn reactivate_for_new_subscriber(&self) {
141 {
142 let mut state = self.state.lock().unwrap();
143 state.last_sent_generation = 0;
145 state.unregistered = false;
146 }
147 self.subscriber_changed.notify_one();
148 }
149
150 fn mark_fanout_started(&self) -> bool {
151 let mut state = self.state.lock().unwrap();
152 if state.fanout_started {
153 return false;
154 }
155
156 state.fanout_started = true;
157 true
158 }
159
160 #[cfg(test)]
161 pub(crate) fn fanout_started_for_test(&self) -> bool {
162 self.state.lock().unwrap().fanout_started
163 }
164}
165
166#[derive(Debug)]
168pub struct QueryDynFilterRegistry {
169 query_id: QueryId,
170 lifecycle_tx: watch::Sender<()>,
171 inner: RwLock<QueryDynFilterRegistryInner>,
172}
173
174impl QueryDynFilterRegistry {
175 pub fn new(query_id: QueryId) -> Self {
176 let (lifecycle_tx, _) = watch::channel(());
178 Self {
179 query_id,
180 lifecycle_tx,
181 inner: RwLock::new(QueryDynFilterRegistryInner {
182 entries: HashMap::new(),
183 }),
184 }
185 }
186
187 pub fn query_id(&self) -> QueryId {
188 self.query_id
189 }
190
191 pub fn entry_count(&self) -> usize {
192 self.inner.read().unwrap().entries.len()
193 }
194
195 pub fn entries(&self) -> Vec<Arc<DynFilterEntry>> {
196 self.inner
197 .read()
198 .unwrap()
199 .entries
200 .values()
201 .cloned()
202 .collect()
203 }
204
205 pub fn remote_dyn_filter(&self, filter_id: &FilterId) -> Option<Arc<DynFilterEntry>> {
206 self.inner.read().unwrap().entries.get(filter_id).cloned()
207 }
208
209 pub fn register_remote_dyn_filter(
210 &self,
211 filter_id: FilterId,
212 producer_filter: Arc<DynamicFilterPhysicalExpr>,
213 ) -> EntryRegistration {
214 let mut inner = self.inner.write().unwrap();
215 if let Some(existing) = inner.entries.get(&filter_id) {
216 return EntryRegistration::Existing(existing.clone());
217 }
218
219 let entry = Arc::new(DynFilterEntry::new(filter_id.clone(), producer_filter));
220 inner.entries.insert(filter_id, entry.clone());
221 EntryRegistration::Inserted(entry)
222 }
223
224 pub fn register_subscriber(
225 &self,
226 filter_id: &FilterId,
227 subscriber: Subscriber,
228 ) -> SubscriberRegistration {
229 let Some(entry) = self.inner.read().unwrap().entries.get(filter_id).cloned() else {
230 return SubscriberRegistration::MissingFilter;
231 };
232
233 if entry.register_subscriber(subscriber) {
234 entry.reactivate_for_new_subscriber();
236 SubscriberRegistration::Added
237 } else {
238 SubscriberRegistration::Duplicate
239 }
240 }
241
242 pub fn ensure_fanout_task(self: &Arc<Self>, region_query_handler: RegionQueryHandlerRef) {
246 for entry in self.entries() {
247 ensure_entry_fanout_task(
248 self.query_id,
249 entry,
250 region_query_handler.clone(),
251 self.lifecycle_tx.subscribe(),
252 );
253 }
254 }
255
256 #[cfg(test)]
257 async fn fanout_snapshot(
258 &self,
259 region_query_handler: &RegionQueryHandlerRef,
260 entry: &DynFilterEntry,
261 filter: &DynamicFilterPhysicalExpr,
262 is_complete: bool,
263 ) {
264 let mut lifecycle_rx = self.lifecycle_tx.subscribe();
265 fanout_snapshot_for_query(
266 self.query_id,
267 region_query_handler,
268 entry,
269 filter,
270 is_complete,
271 &mut lifecycle_rx,
272 REMOTE_DYN_FILTER_CONTROL_RPC_TIMEOUT,
273 )
274 .await;
275 }
276
277 #[cfg(test)]
278 async fn unregister_all_once(&self, region_query_handler: &RegionQueryHandlerRef) {
279 for entry in self.entries() {
280 unregister_entry_once_for_query(region_query_handler, self.query_id, &entry).await;
281 }
282 }
283}
284
285fn ensure_entry_fanout_task(
286 query_id: QueryId,
287 entry: Arc<DynFilterEntry>,
288 region_query_handler: RegionQueryHandlerRef,
289 lifecycle_rx: watch::Receiver<()>,
290) {
291 if !entry.mark_fanout_started() {
292 return;
293 }
294
295 let _handle = spawn_global(async move {
296 run_entry_fanout(query_id, entry, region_query_handler, lifecycle_rx).await;
297 });
298}
299
300async fn run_entry_fanout(
301 query_id: QueryId,
302 entry: Arc<DynFilterEntry>,
303 region_query_handler: RegionQueryHandlerRef,
304 mut lifecycle_rx: watch::Receiver<()>,
305) {
306 let mut is_complete = false;
307 let mut reconcile_interval = tokio::time::interval_at(
309 tokio::time::Instant::now() + REMOTE_DYN_FILTER_RECONCILE_INTERVAL,
310 REMOTE_DYN_FILTER_RECONCILE_INTERVAL,
311 );
312 reconcile_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
313
314 loop {
315 let Some(filter) = entry.upgrade_producer_filter() else {
316 unregister_entry_once_for_query(®ion_query_handler, query_id, &entry).await;
317 return;
318 };
319
320 if !fanout_snapshot_for_query(
321 query_id,
322 ®ion_query_handler,
323 &entry,
324 &filter,
325 is_complete,
326 &mut lifecycle_rx,
327 REMOTE_DYN_FILTER_CONTROL_RPC_TIMEOUT,
328 )
329 .await
330 {
331 break;
332 }
333
334 if is_complete {
335 tokio::select! {
336 _ = entry.subscriber_changed.notified() => {}
337 result = lifecycle_rx.changed() => {
338 if result.is_err() {
339 break;
340 }
341 }
342 }
343 continue;
344 }
345
346 tokio::select! {
347 _ = filter.wait_update() => {}
348 _ = filter.wait_complete() => {
349 is_complete = true;
350 }
351 _ = reconcile_interval.tick() => {}
354 _ = entry.subscriber_changed.notified() => {}
355 result = lifecycle_rx.changed() => {
356 if result.is_err() {
357 break;
358 }
359 }
360 }
361 }
362
363 unregister_entry_once_for_query(®ion_query_handler, query_id, &entry).await;
364}
365
366async fn fanout_snapshot_for_query(
367 query_id: QueryId,
368 region_query_handler: &RegionQueryHandlerRef,
369 entry: &DynFilterEntry,
370 filter: &DynamicFilterPhysicalExpr,
371 is_complete: bool,
372 lifecycle_rx: &mut watch::Receiver<()>,
373 control_rpc_timeout: Duration,
374) -> bool {
375 let Some((generation, current)) = current_stable_snapshot(filter, lifecycle_rx).await else {
376 return true;
377 };
378
379 if !is_complete && !entry.mark_generation_sent(generation) {
383 return true;
384 }
385
386 if is_complete {
387 let _ = entry.mark_generation_sent(generation);
388 }
389
390 let payload = match DynFilterPayload::from_datafusion_expr(
391 ¤t,
392 REMOTE_DYN_FILTER_UPDATE_PAYLOAD_MAX_BYTES,
393 ) {
394 Ok(DynFilterPayload::Datafusion(payload)) => payload,
395 Ok(_) => {
396 warn!("Ignored unsupported remote dynamic filter producer payload");
397 return true;
398 }
399 Err(error) => {
400 warn!(error; "Failed to encode remote dynamic filter producer snapshot");
401 return true;
402 }
403 };
404
405 fanout_update_for_query(
406 query_id,
407 region_query_handler,
408 entry,
409 generation,
410 is_complete,
411 payload,
412 lifecycle_rx,
413 control_rpc_timeout,
414 )
415 .await
416}
417
418#[allow(clippy::too_many_arguments)]
419async fn fanout_update_for_query(
420 query_id: QueryId,
421 region_query_handler: &RegionQueryHandlerRef,
422 entry: &DynFilterEntry,
423 generation: u64,
424 is_complete: bool,
425 payload: Vec<u8>,
426 lifecycle_rx: &mut watch::Receiver<()>,
427 control_rpc_timeout: Duration,
428) -> bool {
429 let query_id = query_id.to_string();
430 let filter_id = entry.filter_id().to_string();
431
432 for subscriber in entry.subscribers() {
433 let update = RemoteDynFilterUpdate {
434 filter_id: filter_id.clone(),
435 payload: payload.clone(),
436 generation,
437 is_complete,
438 };
439
440 match await_control_rpc_or_lifecycle_close(
441 lifecycle_rx,
442 format!(
443 "update query_id={} filter_id={} region_id={}",
444 query_id,
445 filter_id,
446 subscriber.region_id()
447 ),
448 region_query_handler.handle_remote_dyn_filter_update(
449 subscriber.region_id(),
450 query_id.clone(),
451 update,
452 ),
453 control_rpc_timeout,
454 )
455 .await
456 {
457 ControlRpcResult::Ok(result) => {
458 if let Err(error) = result {
459 warn!(
460 error;
461 "Failed to fan out remote dynamic filter update, query_id={}, filter_id={}, region_id={}",
462 query_id,
463 filter_id,
464 subscriber.region_id()
465 );
466 }
467 }
468 ControlRpcResult::TimedOut => {}
469 ControlRpcResult::LifecycleClosed => return false,
470 }
471 }
472
473 true
474}
475
476async fn unregister_entry_once_for_query(
477 region_query_handler: &RegionQueryHandlerRef,
478 query_id: QueryId,
479 entry: &DynFilterEntry,
480) {
481 if !entry.try_mark_unregistered() {
482 return;
483 }
484
485 let query_id = query_id.to_string();
486 let filter_id = entry.filter_id().to_string();
487
488 for subscriber in entry.subscribers() {
489 let unregister = RemoteDynFilterUnregister {
490 filter_id: filter_id.clone(),
491 };
492
493 let Some(result) = await_control_rpc_timeout(
494 format!(
495 "unregister query_id={} filter_id={} region_id={}",
496 query_id,
497 filter_id,
498 subscriber.region_id()
499 ),
500 region_query_handler.handle_remote_dyn_filter_unregister(
501 subscriber.region_id(),
502 query_id.clone(),
503 unregister,
504 ),
505 )
506 .await
507 else {
508 continue;
509 };
510
511 if let Err(error) = result {
512 warn!(
513 error;
514 "Failed to fan out remote dynamic filter unregister, query_id={}, filter_id={}, region_id={}",
515 query_id,
516 filter_id,
517 subscriber.region_id()
518 );
519 }
520 }
521
522 debug!("Remote dynamic filter producer unregistered subscribers");
523}
524
525enum ControlRpcResult<T> {
526 Ok(T),
527 TimedOut,
528 LifecycleClosed,
529}
530
531async fn await_control_rpc_or_lifecycle_close<T>(
532 lifecycle_rx: &mut watch::Receiver<()>,
533 operation: String,
534 rpc: impl Future<Output = T>,
535 control_rpc_timeout: Duration,
536) -> ControlRpcResult<T> {
537 if lifecycle_rx.has_changed().is_err() {
538 return ControlRpcResult::LifecycleClosed;
539 }
540
541 tokio::select! {
542 biased;
543 result = lifecycle_rx.changed() => {
544 if result.is_err() {
545 debug!("Cancelled remote dynamic filter control RPC after lifecycle close");
546 }
547 ControlRpcResult::LifecycleClosed
548 }
549 result = rpc => ControlRpcResult::Ok(result),
550 _ = tokio::time::sleep(control_rpc_timeout) => {
551 warn!("Timed out remote dynamic filter control RPC: {}", operation);
552 ControlRpcResult::TimedOut
553 }
554 }
555}
556
557async fn await_control_rpc_timeout<T>(
558 operation: String,
559 rpc: impl Future<Output = T>,
560) -> Option<T> {
561 tokio::select! {
562 result = rpc => Some(result),
563 _ = tokio::time::sleep(REMOTE_DYN_FILTER_CONTROL_RPC_TIMEOUT) => {
564 warn!("Timed out remote dynamic filter control RPC: {}", operation);
565 None
566 }
567 }
568}
569
570async fn current_stable_snapshot(
571 filter: &DynamicFilterPhysicalExpr,
572 lifecycle_rx: &mut watch::Receiver<()>,
573) -> Option<(u64, Arc<dyn PhysicalExpr>)> {
574 loop {
575 if lifecycle_rx.has_changed().is_err() {
576 return None;
577 }
578
579 let before = filter.snapshot_generation();
580 let current = match filter.current() {
581 Ok(current) => current,
582 Err(error) => {
583 warn!(error; "Failed to read remote dynamic filter producer snapshot");
584 return None;
585 }
586 };
587 let after = filter.snapshot_generation();
588
589 if before == after {
590 return Some((after, current));
591 }
592
593 tokio::select! {
594 biased;
595 result = lifecycle_rx.changed() => {
596 if result.is_err() {
597 return None;
598 }
599 }
600 _ = tokio::task::yield_now() => {}
601 }
602 }
603}
604
605#[derive(Debug)]
609pub struct RemoteDynFilterRegistryLease {
610 registry_manager: Arc<DynFilterRegistryManager>,
611 registry: Option<Arc<QueryDynFilterRegistry>>,
615}
616
617impl RemoteDynFilterRegistryLease {
618 fn new(
619 registry_manager: Arc<DynFilterRegistryManager>,
620 registry: Arc<QueryDynFilterRegistry>,
621 ) -> Self {
622 Self {
623 registry_manager,
624 registry: Some(registry),
625 }
626 }
627
628 pub fn registry(&self) -> &QueryDynFilterRegistry {
629 self.registry
630 .as_deref()
631 .expect("remote dyn filter registry lease must hold a registry")
632 }
633
634 pub fn ensure_fanout_task(&self, region_query_handler: RegionQueryHandlerRef) {
635 self.registry
636 .as_ref()
637 .expect("remote dyn filter registry lease must hold a registry")
638 .ensure_fanout_task(region_query_handler);
639 }
640
641 #[cfg(test)]
642 pub(crate) fn ptr_eq(&self, other: &Self) -> bool {
643 Arc::ptr_eq(
644 self.registry.as_ref().unwrap(),
645 other.registry.as_ref().unwrap(),
646 )
647 }
648}
649
650impl Drop for RemoteDynFilterRegistryLease {
651 fn drop(&mut self) {
652 let Some(registry) = self.registry.take() else {
653 return;
654 };
655 let query_id = registry.query_id();
656 let registry_weak = Arc::downgrade(®istry);
657
658 drop(registry);
660
661 let _ = self
662 .registry_manager
663 .remove_if_dropped_registry(&query_id, ®istry_weak);
664 }
665}
666
667#[derive(Debug, Default)]
671pub struct DynFilterRegistryManager {
672 registries: RwLock<HashMap<QueryId, Weak<QueryDynFilterRegistry>>>,
673}
674
675impl DynFilterRegistryManager {
676 #[cfg(test)]
677 fn get(&self, query_id: &QueryId) -> Option<Arc<QueryDynFilterRegistry>> {
678 let (registry, stale_entry) = {
679 let registries = self.registries.read().unwrap();
680 let registry = registries.get(query_id)?;
681
682 (registry.upgrade(), registry.clone())
683 };
684
685 if registry.is_none() {
686 self.remove_stale_entry(query_id, &stale_entry);
687 }
688
689 registry
690 }
691
692 #[cfg(test)]
693 fn remove(&self, query_id: &QueryId) -> Option<Weak<QueryDynFilterRegistry>> {
694 self.registries.write().unwrap().remove(query_id)
695 }
696
697 fn remove_if_dropped_registry(
698 &self,
699 query_id: &QueryId,
700 dropped_registry: &Weak<QueryDynFilterRegistry>,
701 ) -> Option<Weak<QueryDynFilterRegistry>> {
702 let mut registries = self
703 .registries
704 .write()
705 .unwrap_or_else(|poisoned| poisoned.into_inner());
706 let current = registries.get(query_id)?;
707
708 if current.ptr_eq(dropped_registry) && current.upgrade().is_none() {
710 registries.remove(query_id)
711 } else {
712 None
713 }
714 }
715
716 #[cfg(test)]
717 fn remove_stale_entry(
718 &self,
719 query_id: &QueryId,
720 stale_registry: &Weak<QueryDynFilterRegistry>,
721 ) {
722 let mut registries = self.registries.write().unwrap();
723 let Some(current) = registries.get(query_id) else {
724 return;
725 };
726
727 if current.ptr_eq(stale_registry) && current.upgrade().is_none() {
728 registries.remove(query_id);
729 }
730 }
731
732 pub fn acquire_lease(self: &Arc<Self>, query_id: QueryId) -> RemoteDynFilterRegistryLease {
736 let registry = self.get_or_init(query_id);
737 RemoteDynFilterRegistryLease::new(self.clone(), registry)
738 }
739
740 fn get_or_init(&self, query_id: QueryId) -> Arc<QueryDynFilterRegistry> {
741 let mut registries = self.registries.write().unwrap();
742
743 if let Some(registry) = registries.get(&query_id).and_then(Weak::upgrade) {
744 return registry;
745 }
746
747 let registry = Arc::new(QueryDynFilterRegistry::new(query_id));
748 registries.insert(query_id, Arc::downgrade(®istry));
749 registry
750 }
751
752 #[cfg(test)]
753 pub fn registry_count(&self) -> usize {
754 self.registries
756 .read()
757 .unwrap()
758 .values()
759 .filter(|registry| registry.strong_count() > 0)
760 .count()
761 }
762
763 #[cfg(test)]
764 fn weak_entry_count(&self) -> usize {
765 self.registries.read().unwrap().len()
766 }
767}
768
769#[cfg(test)]
770mod tests {
771 use std::sync::atomic::{AtomicBool, Ordering};
772 use std::sync::{Barrier, Mutex};
773 use std::thread;
774 use std::time::Duration;
775
776 use api::v1::region::{RemoteDynFilterUnregister, RemoteDynFilterUpdate};
777 use async_trait::async_trait;
778 use common_query::request::QueryRequest;
779 use datafusion_physical_expr::expressions::{Column, lit};
780 use session::ReadPreference;
781 use uuid::Uuid;
782
783 use super::*;
784 use crate::dist_plan::{FilterFingerprint, RemoteDynFilterProducerId};
785 use crate::error::Result as QueryResult;
786 use crate::region_query::RegionQueryHandler;
787
788 #[derive(Debug, Clone, PartialEq, Eq)]
789 struct RecordedUpdate {
790 region_id: RegionId,
791 query_id: String,
792 filter_id: String,
793 generation: u64,
794 is_complete: bool,
795 payload: Vec<u8>,
796 }
797
798 #[derive(Debug, Clone, PartialEq, Eq)]
799 struct RecordedUnregister {
800 region_id: RegionId,
801 query_id: String,
802 filter_id: String,
803 }
804
805 #[derive(Default)]
806 struct RecordingRegionQueryHandler {
807 updates: Mutex<Vec<RecordedUpdate>>,
808 unregisters: Mutex<Vec<RecordedUnregister>>,
809 block_next_update: AtomicBool,
810 update_blocked: Notify,
811 release_update: Notify,
812 }
813
814 impl RecordingRegionQueryHandler {
815 fn updates(&self) -> Vec<RecordedUpdate> {
816 self.updates.lock().unwrap().clone()
817 }
818
819 fn unregisters(&self) -> Vec<RecordedUnregister> {
820 self.unregisters.lock().unwrap().clone()
821 }
822
823 fn block_next_update(&self) {
824 self.block_next_update.store(true, Ordering::SeqCst);
825 }
826
827 async fn wait_for_blocked_update(&self) {
828 self.update_blocked.notified().await;
829 }
830
831 fn release_blocked_update(&self) {
832 self.release_update.notify_one();
833 }
834
835 async fn wait_for_update_count(&self, expected: usize) {
836 for _ in 0..300 {
837 if self.updates().len() >= expected {
838 return;
839 }
840 tokio::time::sleep(Duration::from_millis(10)).await;
841 }
842 panic!("timed out waiting for {expected} remote dyn filter updates");
843 }
844
845 async fn wait_for_unregister_count(&self, expected: usize) {
846 for _ in 0..300 {
847 if self.unregisters().len() >= expected {
848 return;
849 }
850 tokio::time::sleep(Duration::from_millis(10)).await;
851 }
852 panic!("timed out waiting for {expected} remote dyn filter unregisters");
853 }
854 }
855
856 async fn wait_for_registry_drop(registry: Weak<QueryDynFilterRegistry>) {
857 for _ in 0..300 {
858 if registry.upgrade().is_none() {
859 return;
860 }
861 tokio::time::sleep(Duration::from_millis(10)).await;
862 }
863 panic!("timed out waiting for remote dyn filter registry drop");
864 }
865
866 #[async_trait]
867 impl RegionQueryHandler for RecordingRegionQueryHandler {
868 async fn do_get(
869 &self,
870 _read_preference: ReadPreference,
871 _request: QueryRequest,
872 ) -> QueryResult<common_recordbatch::SendableRecordBatchStream> {
873 unreachable!("remote dyn filter registry tests should not execute remote queries")
874 }
875
876 async fn handle_remote_dyn_filter_update(
877 &self,
878 region_id: RegionId,
879 query_id: String,
880 update: RemoteDynFilterUpdate,
881 ) -> QueryResult<()> {
882 let should_block = self.block_next_update.swap(false, Ordering::SeqCst);
883 self.updates.lock().unwrap().push(RecordedUpdate {
884 region_id,
885 query_id,
886 filter_id: update.filter_id,
887 generation: update.generation,
888 is_complete: update.is_complete,
889 payload: update.payload,
890 });
891 if should_block {
892 self.update_blocked.notify_one();
893 self.release_update.notified().await;
894 }
895 Ok(())
896 }
897
898 async fn handle_remote_dyn_filter_unregister(
899 &self,
900 region_id: RegionId,
901 query_id: String,
902 unregister: RemoteDynFilterUnregister,
903 ) -> QueryResult<()> {
904 self.unregisters.lock().unwrap().push(RecordedUnregister {
905 region_id,
906 query_id,
907 filter_id: unregister.filter_id,
908 });
909 Ok(())
910 }
911 }
912
913 fn test_query_id(value: u128) -> QueryId {
914 QueryId::from(Uuid::from_u128(value))
915 }
916
917 fn test_filter_id(producer_ordinal: u32) -> FilterId {
918 FilterId::new(
919 RemoteDynFilterProducerId::new(42),
920 producer_ordinal,
921 FilterFingerprint::new(0xabc),
922 )
923 }
924
925 fn test_dyn_filter(names: &[&str]) -> Arc<DynamicFilterPhysicalExpr> {
926 let children = names
927 .iter()
928 .enumerate()
929 .map(|(index, name)| Arc::new(Column::new(name, index)) as _)
930 .collect();
931
932 Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true) as _))
933 }
934
935 #[test]
936 fn registry_manager_returns_same_registry_for_same_query() {
937 let manager = Arc::new(DynFilterRegistryManager::default());
938 let query_id = test_query_id(1);
939 let first = manager.acquire_lease(query_id);
940 let second = manager.acquire_lease(query_id);
941
942 assert!(first.ptr_eq(&second));
943 assert_eq!(manager.registry_count(), 1);
944 assert_eq!(manager.weak_entry_count(), 1);
945 }
946
947 #[test]
948 fn registry_manager_removes_registry_for_query() {
949 let manager = Arc::new(DynFilterRegistryManager::default());
950 let query_id = test_query_id(1);
951
952 let lease = manager.acquire_lease(query_id);
953
954 assert!(
955 manager
956 .remove(&query_id)
957 .unwrap()
958 .ptr_eq(&Arc::downgrade(lease.registry.as_ref().unwrap()))
959 );
960 assert!(manager.get(&query_id).is_none());
961 assert_eq!(manager.registry_count(), 0);
962 assert_eq!(manager.weak_entry_count(), 0);
963 }
964
965 #[test]
966 fn registry_manager_lease_waits_for_last_query_scoped_stream() {
967 let manager = Arc::new(DynFilterRegistryManager::default());
968 let query_id = test_query_id(1);
969
970 let first = manager.acquire_lease(query_id);
971 let second = manager.acquire_lease(query_id);
972
973 assert!(first.ptr_eq(&second));
974 assert_eq!(manager.registry_count(), 1);
975 assert_eq!(manager.weak_entry_count(), 1);
976 drop(first);
977 assert_eq!(manager.registry_count(), 1);
978 assert_eq!(manager.weak_entry_count(), 1);
979
980 drop(second);
981 assert_eq!(manager.registry_count(), 0);
982 assert_eq!(manager.weak_entry_count(), 0);
983 }
984
985 #[test]
986 fn registry_manager_lease_does_not_remove_reacquired_registry() {
987 let manager = Arc::new(DynFilterRegistryManager::default());
988 let query_id = test_query_id(1);
989
990 let first = manager.acquire_lease(query_id);
991 drop(first);
992 assert_eq!(manager.registry_count(), 0);
993 assert_eq!(manager.weak_entry_count(), 0);
994
995 let second = manager.acquire_lease(query_id);
996
997 assert_eq!(manager.registry_count(), 1);
998 assert_eq!(manager.weak_entry_count(), 1);
999 drop(second);
1000 assert_eq!(manager.registry_count(), 0);
1001 assert_eq!(manager.weak_entry_count(), 0);
1002 }
1003
1004 #[test]
1005 fn registry_manager_concurrent_final_lease_drop_cleans_weak_entry() {
1006 let manager = Arc::new(DynFilterRegistryManager::default());
1007 let query_id = test_query_id(1);
1008 let first = manager.acquire_lease(query_id);
1009 let second = manager.acquire_lease(query_id);
1010 let barrier = Arc::new(Barrier::new(3));
1011
1012 let first_barrier = barrier.clone();
1013 let first_drop = thread::spawn(move || {
1014 first_barrier.wait();
1015 drop(first);
1016 });
1017
1018 let second_barrier = barrier.clone();
1019 let second_drop = thread::spawn(move || {
1020 second_barrier.wait();
1021 drop(second);
1022 });
1023
1024 barrier.wait();
1025 first_drop.join().unwrap();
1026 second_drop.join().unwrap();
1027
1028 assert_eq!(manager.registry_count(), 0);
1029 assert_eq!(manager.weak_entry_count(), 0);
1030 }
1031
1032 #[test]
1033 fn registry_manager_concurrent_first_acquire_shares_registry() {
1034 let manager = Arc::new(DynFilterRegistryManager::default());
1035 let query_id = test_query_id(1);
1036 let worker_count = 8;
1037 let barrier = Arc::new(Barrier::new(worker_count + 1));
1038
1039 let handles = (0..worker_count)
1040 .map(|_| {
1041 let manager = manager.clone();
1042 let barrier = barrier.clone();
1043 thread::spawn(move || {
1044 barrier.wait();
1045 manager.acquire_lease(query_id)
1046 })
1047 })
1048 .collect::<Vec<_>>();
1049
1050 barrier.wait();
1051 let leases = handles
1052 .into_iter()
1053 .map(|handle| handle.join().unwrap())
1054 .collect::<Vec<_>>();
1055
1056 let first = leases.first().unwrap();
1057 assert!(leases.iter().all(|lease| first.ptr_eq(lease)));
1058 assert_eq!(manager.registry_count(), 1);
1059 assert_eq!(manager.weak_entry_count(), 1);
1060
1061 drop(leases);
1062 assert_eq!(manager.registry_count(), 0);
1063 assert_eq!(manager.weak_entry_count(), 0);
1064 }
1065
1066 #[test]
1067 fn registry_manager_drop_racing_acquire_does_not_leave_stale_entry() {
1068 let manager = Arc::new(DynFilterRegistryManager::default());
1069 let query_id = test_query_id(1);
1070
1071 for _ in 0..64 {
1072 let old_lease = manager.acquire_lease(query_id);
1073 let barrier = Arc::new(Barrier::new(3));
1074
1075 let drop_barrier = barrier.clone();
1076 let drop_thread = thread::spawn(move || {
1077 drop_barrier.wait();
1078 drop(old_lease);
1079 });
1080
1081 let acquire_manager = manager.clone();
1082 let acquire_barrier = barrier.clone();
1083 let acquire_thread = thread::spawn(move || {
1084 acquire_barrier.wait();
1085 acquire_manager.acquire_lease(query_id)
1086 });
1087
1088 barrier.wait();
1089 drop_thread.join().unwrap();
1090 let new_lease = acquire_thread.join().unwrap();
1091
1092 assert_eq!(manager.registry_count(), 1);
1093 assert_eq!(manager.weak_entry_count(), 1);
1094 drop(new_lease);
1095 assert_eq!(manager.registry_count(), 0);
1096 assert_eq!(manager.weak_entry_count(), 0);
1097 }
1098 }
1099
1100 #[test]
1101 fn registry_manager_old_drop_cannot_remove_replacement_registry() {
1102 let manager = Arc::new(DynFilterRegistryManager::default());
1103 let query_id = test_query_id(1);
1104 let old_lease = manager.acquire_lease(query_id);
1105 let old_registry = Arc::downgrade(old_lease.registry.as_ref().unwrap());
1106
1107 drop(old_lease);
1108 assert_eq!(manager.registry_count(), 0);
1109 assert_eq!(manager.weak_entry_count(), 0);
1110
1111 let replacement_lease = manager.acquire_lease(query_id);
1112 assert_eq!(manager.registry_count(), 1);
1113 assert_eq!(manager.weak_entry_count(), 1);
1114
1115 assert!(
1116 manager
1117 .remove_if_dropped_registry(&query_id, &old_registry)
1118 .is_none(),
1119 "old registry cleanup must not remove the replacement weak entry"
1120 );
1121 assert_eq!(manager.registry_count(), 1);
1122 assert_eq!(manager.weak_entry_count(), 1);
1123
1124 drop(replacement_lease);
1125 assert_eq!(manager.registry_count(), 0);
1126 assert_eq!(manager.weak_entry_count(), 0);
1127 }
1128
1129 #[test]
1130 fn registry_stores_filter_and_deduplicates_subscribers() {
1131 let registry = QueryDynFilterRegistry::new(test_query_id(1));
1132 let filter = test_dyn_filter(&["host"]);
1133 let filter_id = test_filter_id(1);
1134 let entry = match registry.register_remote_dyn_filter(filter_id.clone(), filter.clone()) {
1135 EntryRegistration::Inserted(entry) => entry,
1136 other => panic!("unexpected registration result: {other:?}"),
1137 };
1138
1139 assert_eq!(entry.filter_id(), &filter_id);
1140 assert_eq!(registry.entry_count(), 1);
1141
1142 let subscriber = Subscriber::new(RegionId::new(1024, 1));
1143 assert_eq!(
1144 registry.register_subscriber(&filter_id, subscriber.clone()),
1145 SubscriberRegistration::Added
1146 );
1147 assert_eq!(
1148 registry.register_subscriber(&filter_id, subscriber),
1149 SubscriberRegistration::Duplicate
1150 );
1151 assert_eq!(entry.subscribers().len(), 1);
1152 }
1153
1154 #[tokio::test]
1155 async fn fanout_sends_changed_generations_to_subscribers() {
1156 let query_id = test_query_id(1);
1157 let registry = Arc::new(QueryDynFilterRegistry::new(query_id));
1158 let filter = test_dyn_filter(&["host"]);
1159 let filter_id = test_filter_id(1);
1160 let entry = match registry.register_remote_dyn_filter(filter_id.clone(), filter.clone()) {
1161 EntryRegistration::Inserted(entry) => entry,
1162 other => panic!("unexpected registration result: {other:?}"),
1163 };
1164 let subscriber = Subscriber::new(RegionId::new(1024, 7));
1165 assert_eq!(
1166 registry.register_subscriber(&filter_id, subscriber.clone()),
1167 SubscriberRegistration::Added
1168 );
1169
1170 let handler = Arc::new(RecordingRegionQueryHandler::default());
1171 let handler_ref = handler.clone() as RegionQueryHandlerRef;
1172
1173 registry
1174 .fanout_snapshot(&handler_ref, &entry, filter.as_ref(), false)
1175 .await;
1176 let updates = handler.updates();
1177 assert_eq!(updates.len(), 1);
1178 assert_eq!(updates[0].region_id, subscriber.region_id());
1179 assert_eq!(updates[0].query_id, query_id.to_string());
1180 assert_eq!(updates[0].filter_id, filter_id.to_string());
1181 assert_eq!(updates[0].generation, filter.snapshot_generation());
1182 assert!(!updates[0].is_complete);
1183 assert!(!updates[0].payload.is_empty());
1184
1185 registry
1186 .fanout_snapshot(&handler_ref, &entry, filter.as_ref(), false)
1187 .await;
1188 assert_eq!(handler.updates().len(), 1);
1189
1190 filter.update(lit(false) as _).unwrap();
1191 registry
1192 .fanout_snapshot(&handler_ref, &entry, filter.as_ref(), false)
1193 .await;
1194 let updates = handler.updates();
1195 assert_eq!(updates.len(), 2);
1196 assert_eq!(updates[1].generation, filter.snapshot_generation());
1197
1198 let second_subscriber = Subscriber::new(RegionId::new(1024, 8));
1199 assert_eq!(
1200 registry.register_subscriber(&filter_id, second_subscriber.clone()),
1201 SubscriberRegistration::Added
1202 );
1203 registry
1204 .fanout_snapshot(&handler_ref, &entry, filter.as_ref(), false)
1205 .await;
1206 let updates = handler.updates();
1207 assert_eq!(updates.len(), 4);
1208 assert!(
1209 updates[2..]
1210 .iter()
1211 .any(|update| update.region_id == subscriber.region_id())
1212 );
1213 assert!(
1214 updates[2..]
1215 .iter()
1216 .any(|update| update.region_id == second_subscriber.region_id())
1217 );
1218 assert_eq!(entry.subscribers().len(), 2);
1219 }
1220
1221 #[tokio::test]
1222 async fn fanout_task_waits_for_dynamic_filter_notifications() {
1223 let query_id = test_query_id(3);
1224 let manager = Arc::new(DynFilterRegistryManager::default());
1225 let lease = manager.acquire_lease(query_id);
1226 let registry_weak = Arc::downgrade(lease.registry.as_ref().unwrap());
1227 let filter = test_dyn_filter(&["host"]);
1228 let filter_id = test_filter_id(1);
1229 let _ = lease
1230 .registry()
1231 .register_remote_dyn_filter(filter_id.clone(), filter.clone());
1232 let subscriber = Subscriber::new(RegionId::new(1024, 7));
1233 assert_eq!(
1234 lease
1235 .registry()
1236 .register_subscriber(&filter_id, subscriber.clone()),
1237 SubscriberRegistration::Added
1238 );
1239
1240 let handler = Arc::new(RecordingRegionQueryHandler::default());
1241 lease.ensure_fanout_task(handler.clone() as RegionQueryHandlerRef);
1242
1243 handler.wait_for_update_count(1).await;
1244 let initial_generation = handler.updates()[0].generation;
1245
1246 filter.update(lit(false) as _).unwrap();
1247 handler.wait_for_update_count(2).await;
1248 let updates = handler.updates();
1249 assert!(updates[1].generation > initial_generation);
1250 assert_eq!(updates[1].region_id, subscriber.region_id());
1251 assert_eq!(updates[1].filter_id, filter_id.to_string());
1252
1253 filter.mark_complete();
1254 handler.wait_for_update_count(3).await;
1255 let updates = handler.updates();
1256 assert!(updates[2].is_complete);
1257
1258 drop(lease);
1259 handler.wait_for_unregister_count(1).await;
1260 let unregisters = handler.unregisters();
1261 assert_eq!(unregisters[0].region_id, subscriber.region_id());
1262 assert_eq!(unregisters[0].filter_id, filter_id.to_string());
1263
1264 wait_for_registry_drop(registry_weak).await;
1265 }
1266
1267 #[tokio::test]
1268 async fn repeated_ensure_fanout_task_keeps_single_watcher() {
1269 let query_id = test_query_id(6);
1270 let manager = Arc::new(DynFilterRegistryManager::default());
1271 let lease = manager.acquire_lease(query_id);
1272 let registry_weak = Arc::downgrade(lease.registry.as_ref().unwrap());
1273 let filter = test_dyn_filter(&["host"]);
1274 let filter_id = test_filter_id(1);
1275 let entry = match lease
1276 .registry()
1277 .register_remote_dyn_filter(filter_id.clone(), filter.clone())
1278 {
1279 EntryRegistration::Inserted(entry) => entry,
1280 other => panic!("unexpected registration result: {other:?}"),
1281 };
1282 let subscriber = Subscriber::new(RegionId::new(1024, 7));
1283 assert_eq!(
1284 lease
1285 .registry()
1286 .register_subscriber(&filter_id, subscriber.clone()),
1287 SubscriberRegistration::Added
1288 );
1289
1290 let handler = Arc::new(RecordingRegionQueryHandler::default());
1291 lease.ensure_fanout_task(handler.clone() as RegionQueryHandlerRef);
1292 lease.ensure_fanout_task(handler.clone() as RegionQueryHandlerRef);
1293
1294 assert!(entry.fanout_started_for_test());
1295 handler.wait_for_update_count(1).await;
1296 tokio::time::sleep(Duration::from_millis(50)).await;
1297 assert_eq!(handler.updates().len(), 1);
1298
1299 filter.update(lit(false) as _).unwrap();
1300 handler.wait_for_update_count(2).await;
1301 tokio::time::sleep(Duration::from_millis(50)).await;
1302 assert_eq!(handler.updates().len(), 2);
1303
1304 drop(lease);
1305 handler.wait_for_unregister_count(1).await;
1306 wait_for_registry_drop(registry_weak).await;
1307 }
1308
1309 #[tokio::test]
1310 async fn fanout_task_resends_complete_snapshot_to_late_subscriber() {
1311 let query_id = test_query_id(7);
1312 let manager = Arc::new(DynFilterRegistryManager::default());
1313 let lease = manager.acquire_lease(query_id);
1314 let registry_weak = Arc::downgrade(lease.registry.as_ref().unwrap());
1315 let filter = test_dyn_filter(&["host"]);
1316 let filter_id = test_filter_id(1);
1317 let _ = lease
1318 .registry()
1319 .register_remote_dyn_filter(filter_id.clone(), filter.clone());
1320 let first_subscriber = Subscriber::new(RegionId::new(1024, 7));
1321 assert_eq!(
1322 lease
1323 .registry()
1324 .register_subscriber(&filter_id, first_subscriber.clone()),
1325 SubscriberRegistration::Added
1326 );
1327
1328 let handler = Arc::new(RecordingRegionQueryHandler::default());
1329 lease.ensure_fanout_task(handler.clone() as RegionQueryHandlerRef);
1330 handler.wait_for_update_count(1).await;
1331
1332 filter.mark_complete();
1333 handler.wait_for_update_count(2).await;
1334 assert!(handler.updates()[1].is_complete);
1335
1336 let late_subscriber = Subscriber::new(RegionId::new(1024, 8));
1337 assert_eq!(
1338 lease
1339 .registry()
1340 .register_subscriber(&filter_id, late_subscriber.clone()),
1341 SubscriberRegistration::Added
1342 );
1343
1344 handler.wait_for_update_count(4).await;
1345 let updates = handler.updates();
1346 assert!(
1347 updates[2..].iter().any(
1348 |update| update.region_id == first_subscriber.region_id() && update.is_complete
1349 )
1350 );
1351 assert!(
1352 updates[2..]
1353 .iter()
1354 .any(|update| update.region_id == late_subscriber.region_id()
1355 && update.is_complete)
1356 );
1357
1358 drop(lease);
1359 handler.wait_for_unregister_count(1).await;
1360 wait_for_registry_drop(registry_weak).await;
1361 }
1362
1363 #[tokio::test]
1364 async fn fanout_task_unregisters_when_producer_filter_is_dropped() {
1365 let query_id = test_query_id(8);
1366 let manager = Arc::new(DynFilterRegistryManager::default());
1367 let lease = manager.acquire_lease(query_id);
1368 let registry_weak = Arc::downgrade(lease.registry.as_ref().unwrap());
1369 let filter = test_dyn_filter(&["host"]);
1370 let filter_id = test_filter_id(1);
1371 let _ = lease
1372 .registry()
1373 .register_remote_dyn_filter(filter_id.clone(), filter.clone());
1374 let subscriber = Subscriber::new(RegionId::new(1024, 7));
1375 assert_eq!(
1376 lease
1377 .registry()
1378 .register_subscriber(&filter_id, subscriber.clone()),
1379 SubscriberRegistration::Added
1380 );
1381
1382 let handler = Arc::new(RecordingRegionQueryHandler::default());
1383 lease.ensure_fanout_task(handler.clone() as RegionQueryHandlerRef);
1384 handler.wait_for_update_count(1).await;
1385
1386 drop(filter);
1387 handler.wait_for_unregister_count(1).await;
1388 let unregisters = handler.unregisters();
1389 assert_eq!(unregisters[0].region_id, subscriber.region_id());
1390 assert_eq!(unregisters[0].filter_id, filter_id.to_string());
1391
1392 drop(lease);
1393 wait_for_registry_drop(registry_weak).await;
1394 }
1395
1396 #[tokio::test]
1397 async fn reconcile_tick_catches_update_while_fanout_is_in_flight() {
1398 let query_id = test_query_id(4);
1399 let manager = Arc::new(DynFilterRegistryManager::default());
1400 let lease = manager.acquire_lease(query_id);
1401 let registry_weak = Arc::downgrade(lease.registry.as_ref().unwrap());
1402 let filter = test_dyn_filter(&["host"]);
1403 let filter_id = test_filter_id(1);
1404 let _ = lease
1405 .registry()
1406 .register_remote_dyn_filter(filter_id.clone(), filter.clone());
1407 let subscriber = Subscriber::new(RegionId::new(1024, 7));
1408 assert_eq!(
1409 lease
1410 .registry()
1411 .register_subscriber(&filter_id, subscriber.clone()),
1412 SubscriberRegistration::Added
1413 );
1414
1415 let handler = Arc::new(RecordingRegionQueryHandler::default());
1416 handler.block_next_update();
1417 lease.ensure_fanout_task(handler.clone() as RegionQueryHandlerRef);
1418
1419 handler.wait_for_blocked_update().await;
1420 let initial_generation = handler.updates()[0].generation;
1421
1422 filter.update(lit(false) as _).unwrap();
1424 handler.release_blocked_update();
1425
1426 handler.wait_for_update_count(2).await;
1427 let updates = handler.updates();
1428 assert!(updates[1].generation > initial_generation);
1429 assert_eq!(updates[1].region_id, subscriber.region_id());
1430 assert_eq!(updates[1].filter_id, filter_id.to_string());
1431
1432 drop(lease);
1433 handler.wait_for_unregister_count(1).await;
1434 wait_for_registry_drop(registry_weak).await;
1435 }
1436
1437 #[tokio::test]
1438 async fn fanout_task_unregisters_after_lifecycle_close_during_blocked_update() {
1439 let query_id = test_query_id(5);
1440 let manager = Arc::new(DynFilterRegistryManager::default());
1441 let lease = manager.acquire_lease(query_id);
1442 let registry_weak = Arc::downgrade(lease.registry.as_ref().unwrap());
1443 let filter = test_dyn_filter(&["host"]);
1444 let filter_id = test_filter_id(1);
1445 let _ = lease
1446 .registry()
1447 .register_remote_dyn_filter(filter_id.clone(), filter.clone());
1448 let subscriber = Subscriber::new(RegionId::new(1024, 7));
1449 assert_eq!(
1450 lease
1451 .registry()
1452 .register_subscriber(&filter_id, subscriber.clone()),
1453 SubscriberRegistration::Added
1454 );
1455
1456 let handler = Arc::new(RecordingRegionQueryHandler::default());
1457 handler.block_next_update();
1458 lease.ensure_fanout_task(handler.clone() as RegionQueryHandlerRef);
1459
1460 handler.wait_for_blocked_update().await;
1461 drop(lease);
1462
1463 handler.wait_for_unregister_count(1).await;
1464 let unregisters = handler.unregisters();
1465 assert_eq!(unregisters[0].region_id, subscriber.region_id());
1466 assert_eq!(unregisters[0].filter_id, filter_id.to_string());
1467 wait_for_registry_drop(registry_weak).await;
1468 }
1469
1470 #[tokio::test]
1471 async fn update_timeout_does_not_stop_fanout_for_other_subscribers() {
1472 let query_id = test_query_id(9);
1473 let registry = QueryDynFilterRegistry::new(query_id);
1474 let filter = test_dyn_filter(&["host"]);
1475 let filter_id = test_filter_id(1);
1476 let entry = match registry.register_remote_dyn_filter(filter_id.clone(), filter.clone()) {
1477 EntryRegistration::Inserted(entry) => entry,
1478 other => panic!("unexpected registration result: {other:?}"),
1479 };
1480 let first_subscriber = Subscriber::new(RegionId::new(1024, 7));
1481 let second_subscriber = Subscriber::new(RegionId::new(1024, 8));
1482 assert_eq!(
1483 registry.register_subscriber(&filter_id, first_subscriber.clone()),
1484 SubscriberRegistration::Added
1485 );
1486 assert_eq!(
1487 registry.register_subscriber(&filter_id, second_subscriber.clone()),
1488 SubscriberRegistration::Added
1489 );
1490
1491 let handler = Arc::new(RecordingRegionQueryHandler::default());
1492 let handler_ref = handler.clone() as RegionQueryHandlerRef;
1493 let mut lifecycle_rx = registry.lifecycle_tx.subscribe();
1494 handler.block_next_update();
1495 assert!(
1496 fanout_snapshot_for_query(
1497 query_id,
1498 &handler_ref,
1499 &entry,
1500 filter.as_ref(),
1501 false,
1502 &mut lifecycle_rx,
1503 Duration::from_millis(100),
1504 )
1505 .await
1506 );
1507
1508 handler.wait_for_blocked_update().await;
1509 handler.wait_for_update_count(2).await;
1512
1513 let initial_updates = handler.updates();
1514 assert_eq!(
1515 initial_updates.len(),
1516 2,
1517 "the healthy subscriber must still receive the update after another subscriber times out"
1518 );
1519 assert!(
1520 initial_updates
1521 .iter()
1522 .any(|update| update.region_id == first_subscriber.region_id())
1523 );
1524 assert!(
1525 initial_updates
1526 .iter()
1527 .any(|update| update.region_id == second_subscriber.region_id())
1528 );
1529
1530 filter.update(lit(false) as _).unwrap();
1531 assert!(
1532 fanout_snapshot_for_query(
1533 query_id,
1534 &handler_ref,
1535 &entry,
1536 filter.as_ref(),
1537 false,
1538 &mut lifecycle_rx,
1539 Duration::from_millis(100),
1540 )
1541 .await
1542 );
1543 handler.wait_for_update_count(4).await;
1544 let updates = handler.updates();
1545 assert!(
1546 updates[2..]
1547 .iter()
1548 .any(|update| update.region_id == first_subscriber.region_id())
1549 );
1550 assert!(
1551 updates[2..]
1552 .iter()
1553 .any(|update| update.region_id == second_subscriber.region_id())
1554 );
1555
1556 registry.unregister_all_once(&handler_ref).await;
1557 handler.wait_for_unregister_count(1).await;
1558 }
1559
1560 #[tokio::test]
1561 async fn unregister_fanout_is_idempotent() {
1562 let query_id = test_query_id(2);
1563 let registry = QueryDynFilterRegistry::new(query_id);
1564 let filter = test_dyn_filter(&["host"]);
1565 let filter_id = test_filter_id(1);
1566 let _ = registry.register_remote_dyn_filter(filter_id.clone(), filter);
1567 let subscriber = Subscriber::new(RegionId::new(1024, 7));
1568 assert_eq!(
1569 registry.register_subscriber(&filter_id, subscriber.clone()),
1570 SubscriberRegistration::Added
1571 );
1572
1573 let handler = Arc::new(RecordingRegionQueryHandler::default());
1574 let handler_ref = handler.clone() as RegionQueryHandlerRef;
1575
1576 registry.unregister_all_once(&handler_ref).await;
1577 registry.unregister_all_once(&handler_ref).await;
1578
1579 let unregisters = handler.unregisters();
1580 assert_eq!(unregisters.len(), 1);
1581 assert_eq!(unregisters[0].region_id, subscriber.region_id());
1582 assert_eq!(unregisters[0].query_id, query_id.to_string());
1583 assert_eq!(unregisters[0].filter_id, filter_id.to_string());
1584 }
1585}