1use std::pin::Pin;
16use std::sync::{Arc, Weak};
17use std::task::{Context, Poll};
18
19use api::region::RegionResponse;
20use api::v1::region::RemoteDynFilterRequest;
21use api::v1::region::remote_dyn_filter_request::Action;
22use common_base::Plugins;
23use common_recordbatch::adapter::RecordBatchMetrics;
24use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
25use common_telemetry::{debug, warn};
26use datafusion_expr::LogicalPlan;
27use futures_util::Stream;
28use query::dist_plan::{
29 RemoteDynFilterReceiverInjector, RemoteDynFilterReceiverInjectorRef,
30 RemoteDynFilterReceiverLogicalPlan,
31};
32use query::options::remote_dyn_filter_pushdown_enabled_from_extensions;
33use query::promql::plan_contains_promql_extension;
34use session::context::QueryContextRef;
35use session::query_id::QueryId;
36use snafu::OptionExt;
37use store_api::storage::RegionId;
38
39use crate::error::{self, Result, UnexpectedSnafu};
40use crate::region_server::registrations::{
41 RemoteDynFilterId, RemoteDynFilterUpdateOutcome, apply_remote_dyn_filter_update,
42 initial_dyn_filter_regs_from_query_ctx, register_initial_dyn_filter_regs,
43 remote_dyn_filter_exprs_for_initial_regs, remove_initial_dyn_filter_regs,
44 unregister_remote_dyn_filter,
45};
46use crate::region_server::{RegionServer, RegionServerInner};
47
48fn remote_dyn_filter_receiver_plan(
49 server: &Weak<RegionServerInner>,
50 origin: LogicalPlan,
51 ctx: QueryContextRef,
52) -> LogicalPlan {
53 if !remote_dyn_filter_pushdown_enabled(&ctx) {
54 return origin;
55 }
56
57 if plan_contains_promql_extension(&origin) {
58 return origin;
59 }
60
61 let Some(query_id) = ctx.remote_query_id_value() else {
62 return origin;
63 };
64
65 let Some(initial_regs) = initial_dyn_filter_regs_from_query_ctx(&ctx) else {
66 return origin;
67 };
68
69 let Some(server) = server.upgrade() else {
70 return origin;
71 };
72
73 let dyn_filters = remote_dyn_filter_exprs_for_initial_regs(
74 &server.initial_remote_dyn_filter_registrations,
75 &query_id,
76 &initial_regs,
77 origin.schema().as_arrow(),
78 );
79
80 if dyn_filters.is_empty() {
81 return origin;
82 }
83
84 RemoteDynFilterReceiverLogicalPlan::new(origin, dyn_filters).into_logical_plan()
85}
86
87fn remote_dyn_filter_pushdown_enabled(query_ctx: &QueryContextRef) -> bool {
88 match remote_dyn_filter_pushdown_enabled_from_extensions(&query_ctx.extensions()) {
89 Ok(enabled) => enabled,
90 Err(error) => {
91 warn!(error; "Remote dynamic filter pushdown is disabled because the query option is invalid");
92 false
93 }
94 }
95}
96
97impl RegionServer {
98 pub fn install_remote_dyn_filter_receiver_injector(&self, plugins: &Plugins) {
99 let server = Arc::downgrade(&self.inner);
100 plugins.insert::<RemoteDynFilterReceiverInjectorRef>(Arc::new(
101 RemoteDynFilterReceiverInjector::new(move |origin, ctx| {
102 remote_dyn_filter_receiver_plan(&server, origin, ctx)
103 }),
104 ));
105 }
106
107 pub(super) fn register_initial_remote_dyn_filter_cleanup(
108 &self,
109 query_ctx: &QueryContextRef,
110 region_id: RegionId,
111 ) -> Option<RemoteDynFilterRegistrationGuard> {
112 if !remote_dyn_filter_pushdown_enabled(query_ctx) {
113 return None;
114 }
115
116 let initial_dyn_filter_regs = initial_dyn_filter_regs_from_query_ctx(query_ctx);
117 let query_id = query_ctx.remote_query_id_value();
118 let registered_filter_ids = if let (Some(query_id), Some(regs)) =
119 (query_id.as_ref(), initial_dyn_filter_regs.as_ref())
120 {
121 register_initial_dyn_filter_regs(
122 &self.inner.initial_remote_dyn_filter_registrations,
123 query_id,
124 region_id,
125 regs,
126 )
127 } else {
128 Vec::new()
129 };
130
131 match (query_id, registered_filter_ids.is_empty()) {
132 (Some(query_id), false) => Some(RemoteDynFilterRegistrationGuard::new(
133 self.clone(),
134 query_id,
135 region_id,
136 registered_filter_ids,
137 )),
138 _ => None,
139 }
140 }
141
142 pub(super) async fn handle_remote_dyn_filter_request(
143 &self,
144 request: &RemoteDynFilterRequest,
145 ) -> Result<RegionResponse> {
146 if request.query_id.is_empty() {
147 return error::MissingRequiredFieldSnafu { name: "query_id" }.fail();
148 }
149
150 let query_id = request.query_id.parse::<QueryId>().map_err(|_| {
151 UnexpectedSnafu {
152 violated: "remote dynamic filter query_id must be a valid QueryId",
153 }
154 .build()
155 })?;
156
157 match request
158 .action
159 .as_ref()
160 .context(error::MissingRequiredFieldSnafu { name: "action" })?
161 {
162 Action::Update(update) => {
163 self.handle_remote_dyn_filter_update(&query_id, update)
164 .await
165 }
166 Action::Unregister(unregister) => {
167 self.handle_remote_dyn_filter_unregister(&query_id, unregister)
168 .await
169 }
170 }
171 }
172
173 async fn handle_remote_dyn_filter_update(
174 &self,
175 query_id: &QueryId,
176 request: &api::v1::region::RemoteDynFilterUpdate,
177 ) -> Result<RegionResponse> {
178 if request.filter_id.is_empty() {
179 return error::MissingRequiredFieldSnafu { name: "filter_id" }.fail();
180 }
181
182 if request.payload.is_empty() {
183 return error::MissingRequiredFieldSnafu { name: "payload" }.fail();
184 }
185
186 let filter_id = RemoteDynFilterId::new(request.filter_id.clone());
187 let outcome = apply_remote_dyn_filter_update(
188 &self.inner.initial_remote_dyn_filter_registrations,
189 query_id,
190 &filter_id,
191 &request.payload,
192 request.generation,
193 request.is_complete,
194 );
195 self.log_remote_dyn_filter_update_outcome(query_id, &filter_id, outcome);
196
197 Ok(RegionResponse::new(0))
198 }
199
200 async fn handle_remote_dyn_filter_unregister(
201 &self,
202 query_id: &QueryId,
203 request: &api::v1::region::RemoteDynFilterUnregister,
204 ) -> Result<RegionResponse> {
205 if request.filter_id.is_empty() {
206 return error::MissingRequiredFieldSnafu { name: "filter_id" }.fail();
207 }
208
209 let filter_id = RemoteDynFilterId::new(request.filter_id.clone());
210 let outcome = unregister_remote_dyn_filter(
211 &self.inner.initial_remote_dyn_filter_registrations,
212 query_id,
213 &filter_id,
214 );
215 self.log_remote_dyn_filter_update_outcome(query_id, &filter_id, outcome);
216
217 Ok(RegionResponse::new(0))
218 }
219
220 fn log_remote_dyn_filter_update_outcome(
221 &self,
222 query_id: &QueryId,
223 filter_id: &RemoteDynFilterId,
224 outcome: RemoteDynFilterUpdateOutcome,
225 ) {
226 if matches!(
227 outcome,
228 RemoteDynFilterUpdateOutcome::MissingRegistration
229 | RemoteDynFilterUpdateOutcome::AlreadyComplete
230 | RemoteDynFilterUpdateOutcome::PayloadTooLarge
231 | RemoteDynFilterUpdateOutcome::DecodeFailed
232 ) {
233 warn!(
234 "Remote dynamic filter update outcome, query_id: {}, filter_id: {}, outcome: {:?}",
235 query_id, filter_id, outcome
236 );
237 } else {
238 debug!(
239 "Remote dynamic filter update outcome, query_id: {}, filter_id: {}, outcome: {:?}",
240 query_id, filter_id, outcome
241 );
242 }
243 }
244}
245
246pub(super) fn wrap_remote_dyn_filter_guarded_stream(
247 stream: SendableRecordBatchStream,
248 cleanup: RemoteDynFilterRegistrationGuard,
249) -> SendableRecordBatchStream {
250 Box::pin(RemoteDynFilterGuardedStream { stream, cleanup })
251}
252
253pub(super) struct RemoteDynFilterRegistrationGuard {
255 server: RegionServer,
256 query_id: QueryId,
257 region_id: RegionId,
258 filter_ids: Vec<RemoteDynFilterId>,
259 cleaned: bool,
260}
261
262impl RemoteDynFilterRegistrationGuard {
263 fn new(
264 server: RegionServer,
265 query_id: QueryId,
266 region_id: RegionId,
267 filter_ids: Vec<RemoteDynFilterId>,
268 ) -> Self {
269 Self {
270 server,
271 query_id,
272 region_id,
273 filter_ids,
274 cleaned: false,
275 }
276 }
277
278 fn cleanup_once(&mut self) {
279 if self.cleaned {
280 return;
281 }
282
283 remove_initial_dyn_filter_regs(
284 &self.server.inner.initial_remote_dyn_filter_registrations,
285 &self.query_id,
286 self.region_id,
287 &self.filter_ids,
288 );
289 self.cleaned = true;
290 }
291}
292
293impl Drop for RemoteDynFilterRegistrationGuard {
294 fn drop(&mut self) {
295 self.cleanup_once();
296 }
297}
298
299struct RemoteDynFilterGuardedStream {
301 stream: SendableRecordBatchStream,
302 cleanup: RemoteDynFilterRegistrationGuard,
303}
304
305impl RecordBatchStream for RemoteDynFilterGuardedStream {
306 fn name(&self) -> &str {
307 self.stream.name()
308 }
309
310 fn schema(&self) -> datatypes::schema::SchemaRef {
311 self.stream.schema()
312 }
313
314 fn output_ordering(&self) -> Option<&[OrderOption]> {
315 self.stream.output_ordering()
316 }
317
318 fn metrics(&self) -> Option<RecordBatchMetrics> {
319 self.stream.metrics()
320 }
321}
322
323impl Stream for RemoteDynFilterGuardedStream {
324 type Item = common_recordbatch::error::Result<RecordBatch>;
325
326 fn size_hint(&self) -> (usize, Option<usize>) {
327 self.stream.size_hint()
328 }
329
330 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
331 match Pin::new(&mut self.stream).poll_next(cx) {
332 Poll::Ready(None) => {
333 self.cleanup.cleanup_once();
334 Poll::Ready(None)
335 }
336 other => other,
337 }
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use std::assert_matches;
344 use std::collections::{HashMap, HashSet};
345 use std::sync::Arc;
346 use std::time::Duration;
347
348 use api::v1::region::{
349 RemoteDynFilterRequest, RemoteDynFilterUnregister, RemoteDynFilterUpdate,
350 remote_dyn_filter_request,
351 };
352 use common_query::request::{
353 DynFilterPayload, INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
354 InitialDynFilterReg, InitialDynFilterRegs, InitialDynFilterSnapshot,
355 };
356 use common_recordbatch::RecordBatches;
357 use datafusion::arrow::datatypes::Schema as ArrowSchema;
358 use datafusion::physical_plan::PhysicalExpr;
359 use datafusion::physical_plan::expressions::{DynamicFilterPhysicalExpr, lit as physical_lit};
360 use datafusion_common::DFSchema;
361 use datafusion_expr::EmptyRelation;
362 use datatypes::prelude::{ConcreteDataType, VectorRef};
363 use datatypes::schema::{ColumnSchema, Schema};
364 use datatypes::vectors::Int32Vector;
365 use futures_util::StreamExt;
366 use query::options::QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN;
367 use session::context::QueryContext;
368 use session::hints::REMOTE_QUERY_ID_EXTENSION_KEY;
369 use session::query_id::QueryId;
370 use store_api::storage::RegionId;
371
372 use super::*;
373 use crate::region_server::registrations::{
374 REMOTE_DYN_FILTER_PAYLOAD_MAX_BYTES, RemoteDynFilterRegistry,
375 };
376 use crate::tests::mock_region_server;
377
378 #[derive(Debug, Clone)]
379 struct RegisteredDynFilterSnapshot {
380 filter_id: RemoteDynFilterId,
381 child_exprs_datafusion_proto: Vec<Vec<u8>>,
382 subscriber_regions: HashSet<RegionId>,
383 }
384
385 fn query_regs(
386 regs_by_query: &RemoteDynFilterRegistry,
387 query_id: &QueryId,
388 ) -> Option<HashMap<RemoteDynFilterId, RegisteredDynFilterSnapshot>> {
389 regs_by_query.inspect_query(query_id, |query_regs| {
390 query_regs
391 .iter()
392 .map(|(filter_id, registered)| {
393 (
394 filter_id.clone(),
395 RegisteredDynFilterSnapshot {
396 filter_id: registered.filter_id.clone(),
397 child_exprs_datafusion_proto: registered
398 .child_exprs_datafusion_proto
399 .clone(),
400 subscriber_regions: registered.subscriber_regions.clone(),
401 },
402 )
403 })
404 .collect()
405 })
406 }
407
408 fn test_remote_query_id() -> QueryId {
409 QueryId::new()
410 }
411
412 fn test_remote_dyn_filter_region_id() -> RegionId {
413 RegionId::new(1024, 7)
414 }
415
416 fn single_value_stream() -> common_recordbatch::SendableRecordBatchStream {
417 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
418 "v",
419 ConcreteDataType::int32_datatype(),
420 false,
421 )]));
422 let values: VectorRef = Arc::new(Int32Vector::from_slice([1]));
423 let batch = common_recordbatch::RecordBatch::new(schema.clone(), vec![values]).unwrap();
424 RecordBatches::try_new(schema, vec![batch])
425 .unwrap()
426 .as_stream()
427 }
428
429 fn empty_logical_plan() -> LogicalPlan {
430 LogicalPlan::EmptyRelation(EmptyRelation {
431 produce_one_row: false,
432 schema: Arc::new(DFSchema::empty()),
433 })
434 }
435
436 fn query_context_with_initial_regs(query_id: QueryId) -> QueryContext {
437 let mut query_ctx = QueryContext::with("greptime", "public");
438 query_ctx.set_extension(REMOTE_QUERY_ID_EXTENSION_KEY, query_id.to_string());
439 query_ctx.set_extension(
440 INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
441 InitialDynFilterRegs::new(vec![InitialDynFilterReg::new("filter-1", vec![])])
442 .to_extension_value()
443 .unwrap(),
444 );
445 query_ctx
446 }
447
448 #[test]
449 fn remote_dyn_filter_receiver_plan_respects_disabled_query_option() {
450 let mock_region_server = mock_region_server();
451 let query_id = test_remote_query_id();
452 let mut query_ctx = query_context_with_initial_regs(query_id);
453 query_ctx.set_extension(QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN, "false");
454
455 let plan = remote_dyn_filter_receiver_plan(
456 &Arc::downgrade(&mock_region_server.inner),
457 empty_logical_plan(),
458 Arc::new(query_ctx),
459 );
460
461 assert!(matches!(plan, LogicalPlan::EmptyRelation(_)));
462 }
463
464 #[test]
465 fn remote_dyn_filter_cleanup_registration_respects_disabled_query_option() {
466 let mock_region_server = mock_region_server();
467 let query_id = test_remote_query_id();
468 let mut query_ctx = query_context_with_initial_regs(query_id);
469 query_ctx.set_extension(QUERY_ENABLE_REMOTE_DYNAMIC_FILTER_PUSHDOWN, "false");
470 let query_ctx = Arc::new(query_ctx);
471
472 let cleanup = mock_region_server.register_initial_remote_dyn_filter_cleanup(
473 &query_ctx,
474 test_remote_dyn_filter_region_id(),
475 );
476
477 assert!(cleanup.is_none());
478 assert!(
479 mock_region_server
480 .inner
481 .initial_remote_dyn_filter_registrations
482 .inspect_query(&query_id, |_| ())
483 .is_none()
484 );
485 }
486
487 #[test]
488 fn initial_dyn_filter_regs_can_be_read_from_query_context() {
489 let mut query_ctx = QueryContext::with("greptime", "public");
490 query_ctx.set_extension(
491 INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
492 InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
493 "filter-1",
494 vec![vec![1, 2, 3]],
495 )])
496 .to_extension_value()
497 .unwrap(),
498 );
499
500 let regs = initial_dyn_filter_regs_from_query_ctx(&Arc::new(query_ctx)).unwrap();
501
502 assert_eq!(regs.regs.len(), 1);
503 assert_eq!(regs.regs[0].filter_id, "filter-1");
504 }
505
506 #[test]
507 fn initial_dyn_filter_regs_from_query_context_rejects_duplicate_filter_ids() {
508 let mut query_ctx = QueryContext::with("greptime", "public");
509 query_ctx.set_extension(
510 INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
511 InitialDynFilterRegs::new(vec![
512 InitialDynFilterReg::new("filter-1", vec![vec![1, 2, 3]]),
513 InitialDynFilterReg::new("filter-1", vec![vec![4, 5, 6]]),
514 ])
515 .to_extension_value()
516 .unwrap(),
517 );
518
519 let regs = initial_dyn_filter_regs_from_query_ctx(&Arc::new(query_ctx));
520
521 assert!(regs.is_none());
522 }
523
524 #[test]
525 fn register_initial_dyn_filter_regs_creates_query_scoped_entries() {
526 let regs_by_query = RemoteDynFilterRegistry::new();
527 let regs = InitialDynFilterRegs::new(vec![
528 InitialDynFilterReg::new("filter-1", vec![vec![1, 2, 3]]),
529 InitialDynFilterReg::new("filter-2", vec![vec![4, 5, 6]]),
530 ]);
531 let query_id = test_remote_query_id();
532 let region_id = test_remote_dyn_filter_region_id();
533
534 let registered_filter_ids =
535 register_initial_dyn_filter_regs(®s_by_query, &query_id, region_id, ®s);
536
537 let query_regs = query_regs(®s_by_query, &query_id).unwrap();
538 assert_eq!(query_regs.len(), 2);
539 assert_eq!(
540 registered_filter_ids,
541 vec![
542 RemoteDynFilterId::new("filter-1"),
543 RemoteDynFilterId::new("filter-2")
544 ]
545 );
546 let registered = query_regs.get(&RemoteDynFilterId::new("filter-1")).unwrap();
547 assert_eq!(registered.filter_id, RemoteDynFilterId::new("filter-1"));
548 assert_eq!(registered.child_exprs_datafusion_proto, vec![vec![1, 2, 3]]);
549 assert_eq!(registered.subscriber_regions.len(), 1);
550 assert!(registered.subscriber_regions.contains(®ion_id));
551 }
552
553 #[test]
554 fn register_initial_dyn_filter_regs_same_region_duplicate_is_idempotent() {
555 let regs_by_query = RemoteDynFilterRegistry::new();
556 let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
557 "filter-1",
558 vec![vec![1, 2, 3]],
559 )]);
560 let query_id = test_remote_query_id();
561 let region_id = test_remote_dyn_filter_region_id();
562
563 let first = register_initial_dyn_filter_regs(®s_by_query, &query_id, region_id, ®s);
564 let duplicate =
565 register_initial_dyn_filter_regs(®s_by_query, &query_id, region_id, ®s);
566
567 let query_regs = query_regs(®s_by_query, &query_id).unwrap();
568 assert_eq!(query_regs.len(), 1);
569 assert_eq!(first, vec![RemoteDynFilterId::new("filter-1")]);
570 assert!(duplicate.is_empty());
571 let registered = query_regs.get(&RemoteDynFilterId::new("filter-1")).unwrap();
572 assert_eq!(registered.subscriber_regions.len(), 1);
573 assert!(registered.subscriber_regions.contains(®ion_id));
574 }
575
576 #[test]
577 fn register_initial_dyn_filter_regs_ignores_invalid_duplicate_payload_set() {
578 let regs_by_query = RemoteDynFilterRegistry::new();
579 let regs = InitialDynFilterRegs::new(vec![
580 InitialDynFilterReg::new("filter-1", vec![vec![1, 2, 3]]),
581 InitialDynFilterReg::new("filter-1", vec![vec![4, 5, 6]]),
582 ]);
583 let query_id = test_remote_query_id();
584 let region_id = test_remote_dyn_filter_region_id();
585
586 register_initial_dyn_filter_regs(®s_by_query, &query_id, region_id, ®s);
587
588 assert!(query_regs(®s_by_query, &query_id).is_none());
589 }
590
591 #[test]
592 fn register_initial_dyn_filter_regs_tracks_different_region_subscribers_for_same_filter() {
593 let regs_by_query = RemoteDynFilterRegistry::new();
594 let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
595 "filter-1",
596 vec![vec![1, 2, 3]],
597 )]);
598 let query_id = test_remote_query_id();
599 let first_region_id = RegionId::new(1024, 7);
600 let second_region_id = RegionId::new(1024, 8);
601
602 register_initial_dyn_filter_regs(®s_by_query, &query_id, first_region_id, ®s);
603 register_initial_dyn_filter_regs(®s_by_query, &query_id, second_region_id, ®s);
604
605 let query_regs = query_regs(®s_by_query, &query_id).unwrap();
606 assert_eq!(query_regs.len(), 1);
607 let registered = query_regs.get(&RemoteDynFilterId::new("filter-1")).unwrap();
608 assert_eq!(registered.subscriber_regions.len(), 2);
609 assert!(registered.subscriber_regions.contains(&first_region_id));
610 assert!(registered.subscriber_regions.contains(&second_region_id));
611 }
612
613 #[test]
614 fn remove_initial_dyn_filter_regs_removes_registered_filter_entries() {
615 let regs_by_query = RemoteDynFilterRegistry::new();
616 let query_id = test_remote_query_id();
617 let other_query_id = test_remote_query_id();
618 let region_id = test_remote_dyn_filter_region_id();
619
620 let registered_filter_ids = register_initial_dyn_filter_regs(
621 ®s_by_query,
622 &query_id,
623 region_id,
624 &InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
625 "filter-1",
626 vec![vec![1, 2, 3]],
627 )]),
628 );
629 register_initial_dyn_filter_regs(
630 ®s_by_query,
631 &other_query_id,
632 region_id,
633 &InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
634 "filter-2",
635 vec![vec![4, 5, 6]],
636 )]),
637 );
638
639 remove_initial_dyn_filter_regs(
640 ®s_by_query,
641 &query_id,
642 region_id,
643 ®istered_filter_ids,
644 );
645
646 assert!(query_regs(®s_by_query, &query_id).is_none());
647 let other_query_regs = query_regs(®s_by_query, &other_query_id).unwrap();
648 assert_eq!(other_query_regs.len(), 1);
649 }
650
651 #[test]
652 fn remove_initial_dyn_filter_regs_keeps_other_subscribers() {
653 let regs_by_query = RemoteDynFilterRegistry::new();
654 let query_id = test_remote_query_id();
655 let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
656 "filter-1",
657 vec![vec![1, 2, 3]],
658 )]);
659 let first_region_id = RegionId::new(1024, 7);
660 let second_region_id = RegionId::new(1024, 8);
661
662 let first_subscription =
663 register_initial_dyn_filter_regs(®s_by_query, &query_id, first_region_id, ®s);
664 register_initial_dyn_filter_regs(®s_by_query, &query_id, second_region_id, ®s);
665
666 remove_initial_dyn_filter_regs(
667 ®s_by_query,
668 &query_id,
669 first_region_id,
670 &first_subscription,
671 );
672
673 let query_regs = query_regs(®s_by_query, &query_id).unwrap();
674 assert_eq!(query_regs.len(), 1);
675 let registered = query_regs.get(&RemoteDynFilterId::new("filter-1")).unwrap();
676 assert_eq!(registered.subscriber_regions.len(), 1);
677 assert!(registered.subscriber_regions.contains(&second_region_id));
678 }
679
680 #[tokio::test]
681 async fn test_handle_remote_dyn_filter_request_requires_query_id() {
682 let mock_region_server = mock_region_server();
683
684 let err = mock_region_server
685 .handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
686 query_id: String::new(),
687 action: Some(remote_dyn_filter_request::Action::Unregister(
688 RemoteDynFilterUnregister {
689 filter_id: "filter-1".to_string(),
690 },
691 )),
692 })
693 .await
694 .unwrap_err();
695
696 assert_matches!(
697 err,
698 crate::error::Error::MissingRequiredField { ref name, .. } if name == "query_id"
699 );
700 }
701
702 #[tokio::test]
703 async fn test_handle_remote_dyn_filter_request_requires_action() {
704 let mock_region_server = mock_region_server();
705
706 let err = mock_region_server
707 .handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
708 query_id: test_remote_query_id().to_string(),
709 action: None,
710 })
711 .await
712 .unwrap_err();
713
714 assert_matches!(
715 err,
716 crate::error::Error::MissingRequiredField { ref name, .. } if name == "action"
717 );
718 }
719
720 #[tokio::test]
721 async fn test_handle_remote_dyn_filter_update_requires_filter_id() {
722 let mock_region_server = mock_region_server();
723
724 let err = mock_region_server
725 .handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
726 query_id: test_remote_query_id().to_string(),
727 action: Some(remote_dyn_filter_request::Action::Update(
728 RemoteDynFilterUpdate {
729 filter_id: String::new(),
730 payload: vec![1],
731 generation: 1,
732 is_complete: false,
733 },
734 )),
735 })
736 .await
737 .unwrap_err();
738
739 assert_matches!(
740 err,
741 crate::error::Error::MissingRequiredField { ref name, .. } if name == "filter_id"
742 );
743 }
744
745 #[tokio::test]
746 async fn test_handle_remote_dyn_filter_update_requires_payload() {
747 let mock_region_server = mock_region_server();
748
749 let err = mock_region_server
750 .handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
751 query_id: test_remote_query_id().to_string(),
752 action: Some(remote_dyn_filter_request::Action::Update(
753 RemoteDynFilterUpdate {
754 filter_id: "filter-1".to_string(),
755 payload: Vec::new(),
756 generation: 1,
757 is_complete: false,
758 },
759 )),
760 })
761 .await
762 .unwrap_err();
763
764 assert_matches!(
765 err,
766 crate::error::Error::MissingRequiredField { ref name, .. } if name == "payload"
767 );
768 }
769
770 #[test]
771 fn test_apply_remote_dyn_filter_update_missing_registration() {
772 let regs_by_query = RemoteDynFilterRegistry::new();
773 let query_id = test_remote_query_id();
774 let filter_id = RemoteDynFilterId::new("filter-1");
775
776 let outcome =
777 apply_remote_dyn_filter_update(®s_by_query, &query_id, &filter_id, &[1], 1, false);
778 assert_eq!(outcome, RemoteDynFilterUpdateOutcome::MissingRegistration);
779
780 let outcome = unregister_remote_dyn_filter(®s_by_query, &query_id, &filter_id);
781 assert_eq!(outcome, RemoteDynFilterUpdateOutcome::MissingRegistration);
782 }
783
784 #[test]
785 fn test_apply_remote_dyn_filter_update_buffering_before_scan() {
786 let regs_by_query = RemoteDynFilterRegistry::new();
787 let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
788 "filter-1",
789 vec![vec![1, 2, 3]],
790 )]);
791 let query_id = test_remote_query_id();
792 let filter_id = RemoteDynFilterId::new("filter-1");
793
794 register_initial_dyn_filter_regs(
795 ®s_by_query,
796 &query_id,
797 test_remote_dyn_filter_region_id(),
798 ®s,
799 );
800
801 let outcome =
803 apply_remote_dyn_filter_update(®s_by_query, &query_id, &filter_id, &[1], 1, false);
804 assert_eq!(outcome, RemoteDynFilterUpdateOutcome::Buffered);
805
806 let outcome =
808 apply_remote_dyn_filter_update(®s_by_query, &query_id, &filter_id, &[2], 0, false);
809 assert_eq!(outcome, RemoteDynFilterUpdateOutcome::Stale);
810
811 let outcome =
813 apply_remote_dyn_filter_update(®s_by_query, &query_id, &filter_id, &[3], 1, false);
814 assert_eq!(outcome, RemoteDynFilterUpdateOutcome::Idempotent);
815 }
816
817 fn datafusion_payload_bytes(expr: Arc<dyn PhysicalExpr>) -> Vec<u8> {
818 match DynFilterPayload::from_datafusion_expr(&expr, REMOTE_DYN_FILTER_PAYLOAD_MAX_BYTES)
819 .unwrap()
820 {
821 DynFilterPayload::Datafusion(bytes) => bytes,
822 _ => unreachable!(
823 "DynFilterPayload::from_datafusion_expr only returns datafusion payloads"
824 ),
825 }
826 }
827
828 fn empty_arrow_schema() -> ArrowSchema {
829 ArrowSchema::empty()
830 }
831
832 fn register_empty_remote_dyn_filter(
833 regs_by_query: &RemoteDynFilterRegistry,
834 query_id: &QueryId,
835 ) -> Vec<RemoteDynFilterId> {
836 register_empty_remote_dyn_filter_for_region(
837 regs_by_query,
838 query_id,
839 test_remote_dyn_filter_region_id(),
840 )
841 }
842
843 fn register_empty_remote_dyn_filter_for_region(
844 regs_by_query: &RemoteDynFilterRegistry,
845 query_id: &QueryId,
846 region_id: RegionId,
847 ) -> Vec<RemoteDynFilterId> {
848 register_initial_dyn_filter_regs(
849 regs_by_query,
850 query_id,
851 region_id,
852 &InitialDynFilterRegs::new(vec![InitialDynFilterReg::new("filter-1", vec![])]),
853 )
854 }
855
856 #[test]
857 fn initial_remote_dyn_filter_snapshot_initializes_runtime_filter() {
858 let regs_by_query = RemoteDynFilterRegistry::new();
859 let query_id = test_remote_query_id();
860 let payload = DynFilterPayload::Datafusion(datafusion_payload_bytes(physical_lit(false)));
861 let regs = InitialDynFilterRegs::new(vec![
862 InitialDynFilterReg::new("filter-1", vec![])
863 .with_initial_snapshot(InitialDynFilterSnapshot::new(payload, 7, false)),
864 ]);
865
866 register_initial_dyn_filter_regs(
867 ®s_by_query,
868 &query_id,
869 test_remote_dyn_filter_region_id(),
870 ®s,
871 );
872 let exprs = remote_dyn_filter_exprs_for_initial_regs(
873 ®s_by_query,
874 &query_id,
875 ®s,
876 &empty_arrow_schema(),
877 );
878
879 assert_eq!(exprs.len(), 1);
880 assert_eq!(format!("{}", exprs[0]), "DynamicFilter [ false ]");
881 }
882
883 fn only_remote_dyn_filter(
884 regs_by_query: &RemoteDynFilterRegistry,
885 query_id: &QueryId,
886 ) -> Arc<DynamicFilterPhysicalExpr> {
887 let initial_regs =
888 InitialDynFilterRegs::new(vec![InitialDynFilterReg::new("filter-1", vec![])]);
889 let exprs = remote_dyn_filter_exprs_for_initial_regs(
890 regs_by_query,
891 query_id,
892 &initial_regs,
893 &empty_arrow_schema(),
894 );
895 assert_eq!(1, exprs.len());
896 let expr = exprs.into_iter().next().unwrap();
897 let expr = expr as Arc<dyn std::any::Any + Send + Sync>;
898 expr.downcast::<DynamicFilterPhysicalExpr>().unwrap()
899 }
900
901 #[test]
902 fn test_remote_dyn_filter_rejects_oversized_payload_before_buffering() {
903 let regs_by_query = RemoteDynFilterRegistry::new();
904 let query_id = test_remote_query_id();
905 let filter_id = RemoteDynFilterId::new("filter-1");
906 register_empty_remote_dyn_filter(®s_by_query, &query_id);
907
908 let oversized = vec![0; REMOTE_DYN_FILTER_PAYLOAD_MAX_BYTES + 1];
909 let outcome = apply_remote_dyn_filter_update(
910 ®s_by_query,
911 &query_id,
912 &filter_id,
913 &oversized,
914 1,
915 false,
916 );
917 assert_eq!(outcome, RemoteDynFilterUpdateOutcome::PayloadTooLarge);
918
919 let outcome =
921 apply_remote_dyn_filter_update(®s_by_query, &query_id, &filter_id, &[1], 0, false);
922 assert_eq!(outcome, RemoteDynFilterUpdateOutcome::Buffered);
923 }
924
925 #[test]
926 fn test_remote_dyn_filter_generation_zero_applies_as_first_update() {
927 let regs_by_query = RemoteDynFilterRegistry::new();
928 let query_id = test_remote_query_id();
929 let filter_id = RemoteDynFilterId::new("filter-1");
930 register_empty_remote_dyn_filter(®s_by_query, &query_id);
931
932 let dyn_filter = only_remote_dyn_filter(®s_by_query, &query_id);
934 let payload = datafusion_payload_bytes(physical_lit(false));
935 let outcome = apply_remote_dyn_filter_update(
936 ®s_by_query,
937 &query_id,
938 &filter_id,
939 &payload,
940 0,
941 false,
942 );
943
944 assert_eq!(outcome, RemoteDynFilterUpdateOutcome::Applied);
945 assert!(dyn_filter.snapshot_generation() > 1);
946 assert_eq!(format!("{}", dyn_filter.current().unwrap()), "false");
947 }
948
949 #[test]
950 fn test_buffered_update_applies_when_scan_installs_wrapper() {
951 let regs_by_query = RemoteDynFilterRegistry::new();
952 let query_id = test_remote_query_id();
953 let filter_id = RemoteDynFilterId::new("filter-1");
954 register_empty_remote_dyn_filter(®s_by_query, &query_id);
955
956 let payload = datafusion_payload_bytes(physical_lit(false));
957 let outcome = apply_remote_dyn_filter_update(
958 ®s_by_query,
959 &query_id,
960 &filter_id,
961 &payload,
962 1,
963 false,
964 );
965 assert_eq!(outcome, RemoteDynFilterUpdateOutcome::Buffered);
966
967 let dyn_filter = only_remote_dyn_filter(®s_by_query, &query_id);
968 assert!(dyn_filter.snapshot_generation() > 1);
969 assert_eq!(format!("{}", dyn_filter.current().unwrap()), "false");
970 }
971
972 #[tokio::test]
973 async fn test_unregister_completes_installed_remote_dyn_filter_without_relaxing() {
974 let regs_by_query = RemoteDynFilterRegistry::new();
975 let query_id = test_remote_query_id();
976 let filter_id = RemoteDynFilterId::new("filter-1");
977 register_empty_remote_dyn_filter(®s_by_query, &query_id);
978
979 let dyn_filter = only_remote_dyn_filter(®s_by_query, &query_id);
980 let payload = datafusion_payload_bytes(physical_lit(false));
981 let outcome = apply_remote_dyn_filter_update(
982 ®s_by_query,
983 &query_id,
984 &filter_id,
985 &payload,
986 1,
987 false,
988 );
989 assert_eq!(outcome, RemoteDynFilterUpdateOutcome::Applied);
990 assert_eq!(format!("{}", dyn_filter.current().unwrap()), "false");
991
992 let outcome = unregister_remote_dyn_filter(®s_by_query, &query_id, &filter_id);
993 assert_eq!(outcome, RemoteDynFilterUpdateOutcome::Applied);
994 assert!(query_regs(®s_by_query, &query_id).is_none());
995 tokio::time::timeout(Duration::from_secs(1), dyn_filter.wait_complete())
996 .await
997 .unwrap();
998 assert_eq!(format!("{}", dyn_filter.current().unwrap()), "false");
999 }
1000
1001 #[tokio::test]
1002 async fn test_remote_dyn_filter_unregister_removes_all_region_subscribers_for_filter() {
1003 let regs_by_query = RemoteDynFilterRegistry::new();
1004 let query_id = test_remote_query_id();
1005 let filter_id = RemoteDynFilterId::new("filter-1");
1006 let first_region_id = RegionId::new(1024, 7);
1007 let second_region_id = RegionId::new(1024, 8);
1008 let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new("filter-1", vec![])]);
1009
1010 let first_subscription =
1011 register_initial_dyn_filter_regs(®s_by_query, &query_id, first_region_id, ®s);
1012 let second_subscription =
1013 register_initial_dyn_filter_regs(®s_by_query, &query_id, second_region_id, ®s);
1014 let dyn_filter = only_remote_dyn_filter(®s_by_query, &query_id);
1015
1016 let payload = datafusion_payload_bytes(physical_lit(false));
1017 let outcome = apply_remote_dyn_filter_update(
1018 ®s_by_query,
1019 &query_id,
1020 &filter_id,
1021 &payload,
1022 1,
1023 false,
1024 );
1025 assert_eq!(outcome, RemoteDynFilterUpdateOutcome::Applied);
1026
1027 let outcome = unregister_remote_dyn_filter(®s_by_query, &query_id, &filter_id);
1028 assert_eq!(outcome, RemoteDynFilterUpdateOutcome::Applied);
1029 assert!(query_regs(®s_by_query, &query_id).is_none());
1030 tokio::time::timeout(Duration::from_secs(1), dyn_filter.wait_complete())
1031 .await
1032 .unwrap();
1033 assert_eq!(format!("{}", dyn_filter.current().unwrap()), "false");
1034
1035 remove_initial_dyn_filter_regs(
1037 ®s_by_query,
1038 &query_id,
1039 first_region_id,
1040 &first_subscription,
1041 );
1042 remove_initial_dyn_filter_regs(
1043 ®s_by_query,
1044 &query_id,
1045 second_region_id,
1046 &second_subscription,
1047 );
1048 assert!(query_regs(®s_by_query, &query_id).is_none());
1049 }
1050
1051 #[test]
1052 fn test_remote_dyn_filter_unregister_keeps_other_filters_for_same_query() {
1053 let regs_by_query = RemoteDynFilterRegistry::new();
1054 let query_id = test_remote_query_id();
1055 let region_id = test_remote_dyn_filter_region_id();
1056
1057 register_initial_dyn_filter_regs(
1058 ®s_by_query,
1059 &query_id,
1060 region_id,
1061 &InitialDynFilterRegs::new(vec![InitialDynFilterReg::new("filter-1", vec![])]),
1062 );
1063 register_initial_dyn_filter_regs(
1064 ®s_by_query,
1065 &query_id,
1066 region_id,
1067 &InitialDynFilterRegs::new(vec![InitialDynFilterReg::new("filter-2", vec![])]),
1068 );
1069
1070 let outcome = unregister_remote_dyn_filter(
1071 ®s_by_query,
1072 &query_id,
1073 &RemoteDynFilterId::new("filter-1"),
1074 );
1075 assert_eq!(outcome, RemoteDynFilterUpdateOutcome::Applied);
1076
1077 let query_regs = query_regs(®s_by_query, &query_id).unwrap();
1078 assert!(!query_regs.contains_key(&RemoteDynFilterId::new("filter-1")));
1079 assert!(query_regs.contains_key(&RemoteDynFilterId::new("filter-2")));
1080 }
1081
1082 #[tokio::test]
1083 async fn test_remote_dyn_filter_guarded_stream_removes_on_eof() {
1084 let mock_region_server = mock_region_server();
1085 let query_id = test_remote_query_id();
1086 let region_id = test_remote_dyn_filter_region_id();
1087 let registered_filter_ids = register_empty_remote_dyn_filter(
1088 &mock_region_server
1089 .inner
1090 .initial_remote_dyn_filter_registrations,
1091 &query_id,
1092 );
1093 let cleanup = RemoteDynFilterRegistrationGuard::new(
1094 mock_region_server.clone(),
1095 query_id,
1096 region_id,
1097 registered_filter_ids,
1098 );
1099
1100 let stream = wrap_remote_dyn_filter_guarded_stream(single_value_stream(), cleanup);
1101 let mut pinned = Box::pin(stream);
1102 while pinned.next().await.is_some() {}
1103
1104 assert!(
1105 mock_region_server
1106 .inner
1107 .initial_remote_dyn_filter_registrations
1108 .inspect_query(&query_id, |_| ())
1109 .is_none()
1110 );
1111 }
1112
1113 #[tokio::test]
1114 async fn test_remote_dyn_filter_multi_region_subscription_cleanup() {
1115 let regs_by_query = RemoteDynFilterRegistry::new();
1116 let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new("filter-1", vec![])]);
1117 let query_id = test_remote_query_id();
1118 let first_region_id = RegionId::new(1024, 7);
1119 let second_region_id = RegionId::new(1024, 8);
1120
1121 let first_subscription =
1123 register_initial_dyn_filter_regs(®s_by_query, &query_id, first_region_id, ®s);
1124 let second_subscription =
1125 register_initial_dyn_filter_regs(®s_by_query, &query_id, second_region_id, ®s);
1126 let dyn_filter = only_remote_dyn_filter(®s_by_query, &query_id);
1127
1128 {
1130 let query_regs = query_regs(®s_by_query, &query_id).unwrap();
1131 assert_eq!(query_regs.len(), 1);
1132 let registered = query_regs.get(&RemoteDynFilterId::new("filter-1")).unwrap();
1133 assert_eq!(registered.subscriber_regions.len(), 2);
1134 assert!(registered.subscriber_regions.contains(&first_region_id));
1135 assert!(registered.subscriber_regions.contains(&second_region_id));
1136 }
1137
1138 remove_initial_dyn_filter_regs(
1140 ®s_by_query,
1141 &query_id,
1142 first_region_id,
1143 &first_subscription,
1144 );
1145 assert!(query_regs(®s_by_query, &query_id).is_some());
1146 assert!(
1147 tokio::time::timeout(Duration::from_millis(50), dyn_filter.wait_complete())
1148 .await
1149 .is_err()
1150 );
1151 {
1152 let query_regs = query_regs(®s_by_query, &query_id).unwrap();
1153 let registered = query_regs.get(&RemoteDynFilterId::new("filter-1")).unwrap();
1154 assert_eq!(registered.subscriber_regions.len(), 1);
1155 assert!(registered.subscriber_regions.contains(&second_region_id));
1156 }
1157
1158 remove_initial_dyn_filter_regs(
1160 ®s_by_query,
1161 &query_id,
1162 second_region_id,
1163 &second_subscription,
1164 );
1165 assert!(query_regs(®s_by_query, &query_id).is_none());
1166 tokio::time::timeout(Duration::from_secs(1), dyn_filter.wait_complete())
1167 .await
1168 .unwrap();
1169 }
1170}