1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18use std::sync::{Arc, Mutex, OnceLock};
19
20use common_query::request::{
21 DynFilterPayload, INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, InitialDynFilterReg,
22 InitialDynFilterRegs,
23};
24use common_telemetry::warn;
25use dashmap::DashMap;
26use datafusion::arrow::datatypes::{Schema, SchemaRef};
27use datafusion::execution::{SessionStateBuilder, TaskContext};
28use datafusion::physical_plan::PhysicalExpr;
29use datafusion::physical_plan::expressions::{DynamicFilterPhysicalExpr, lit};
30use datafusion_common::Result as DataFusionResult;
31use session::context::QueryContextRef;
32use session::query_id::QueryId;
33use store_api::storage::RegionId;
34
35pub(super) const REMOTE_DYN_FILTER_PAYLOAD_MAX_BYTES: usize = 64 * 1024;
36
37type QueryRemoteDynFilterRegs = HashMap<RemoteDynFilterId, RegisteredDynFilter>;
38
39#[derive(Debug, Default)]
40pub(super) struct RemoteDynFilterRegistry {
41 queries: DashMap<QueryId, Arc<Mutex<QueryRemoteDynFilterRegs>>>,
44}
45
46impl RemoteDynFilterRegistry {
47 pub(super) fn new() -> Self {
48 Self::default()
49 }
50
51 fn get_or_insert_query(&self, query_id: QueryId) -> Arc<Mutex<QueryRemoteDynFilterRegs>> {
52 self.queries
53 .entry(query_id)
54 .or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
55 .clone()
56 }
57
58 fn get_query(&self, query_id: &QueryId) -> Option<Arc<Mutex<QueryRemoteDynFilterRegs>>> {
59 self.queries
60 .get(query_id)
61 .map(|query_regs| query_regs.clone())
62 }
63
64 fn remove_query_if_empty(
65 &self,
66 query_id: &QueryId,
67 expected: &Arc<Mutex<QueryRemoteDynFilterRegs>>,
68 ) {
69 self.queries.remove_if(query_id, |_, query_regs| {
73 Arc::ptr_eq(query_regs, expected) && query_regs.lock().unwrap().is_empty()
74 });
75 }
76
77 #[cfg(test)]
78 pub(super) fn inspect_query<R>(
79 &self,
80 query_id: &QueryId,
81 inspect: impl FnOnce(&QueryRemoteDynFilterRegs) -> R,
82 ) -> Option<R> {
83 self.get_query(query_id)
84 .map(|query_regs| inspect(&query_regs.lock().unwrap()))
85 }
86}
87
88#[derive(Clone, Debug, PartialEq, Eq, Hash)]
89pub(super) struct RemoteDynFilterId(String);
90
91impl RemoteDynFilterId {
92 pub(super) fn new(value: impl Into<String>) -> Self {
93 Self(value.into())
94 }
95}
96
97impl Display for RemoteDynFilterId {
98 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
99 self.0.fmt(f)
100 }
101}
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub(super) enum RemoteDynFilterUpdateOutcome {
105 MissingRegistration,
106 Buffered,
107 Applied,
108 Idempotent,
109 Stale,
110 AlreadyComplete,
111 PayloadTooLarge,
112 DecodeFailed,
113}
114
115#[derive(Debug, Clone)]
116struct PendingDynFilterUpdate {
117 payload: Vec<u8>,
118 generation: u64,
119 is_complete: bool,
120}
121
122impl PendingDynFilterUpdate {
123 fn from_initial_reg(reg: &InitialDynFilterReg) -> Option<Self> {
124 let snapshot = reg.initial_snapshot.as_ref()?;
125 match &snapshot.payload {
126 DynFilterPayload::Datafusion(payload) => Some(Self {
127 payload: payload.clone(),
128 generation: snapshot.generation,
129 is_complete: snapshot.is_complete,
130 }),
131 _ => None,
132 }
133 }
134}
135
136#[derive(Debug)]
137struct RemoteDynFilterEpochState {
138 generation: Option<u64>,
139 is_complete: bool,
140}
141
142#[derive(Debug)]
143struct RemoteDynFilterState {
144 filter: Arc<DynamicFilterPhysicalExpr>,
145 input_schema: SchemaRef,
146 epoch: Mutex<RemoteDynFilterEpochState>,
147}
148
149impl RemoteDynFilterState {
150 fn new(filter: Arc<DynamicFilterPhysicalExpr>, input_schema: SchemaRef) -> Self {
151 Self {
152 filter,
153 input_schema,
154 epoch: Mutex::new(RemoteDynFilterEpochState {
155 generation: None,
156 is_complete: false,
157 }),
158 }
159 }
160
161 fn filter(&self) -> Arc<DynamicFilterPhysicalExpr> {
162 self.filter.clone()
163 }
164
165 fn apply_update(
166 &self,
167 payload: &[u8],
168 generation: u64,
169 is_complete: bool,
170 ) -> RemoteDynFilterUpdateOutcome {
171 if !validate_update_payload_size(payload) {
172 return RemoteDynFilterUpdateOutcome::PayloadTooLarge;
173 }
174
175 let mut epoch = self.epoch.lock().unwrap();
176 if let Some(current_generation) = epoch.generation {
177 if generation < current_generation {
178 return RemoteDynFilterUpdateOutcome::Stale;
179 }
180
181 if generation == current_generation {
182 if is_complete && !epoch.is_complete {
183 self.filter.mark_complete();
184 epoch.is_complete = true;
185 return RemoteDynFilterUpdateOutcome::Applied;
186 }
187 return RemoteDynFilterUpdateOutcome::Idempotent;
188 }
189 }
190
191 if epoch.is_complete {
192 return RemoteDynFilterUpdateOutcome::AlreadyComplete;
193 }
194
195 let expr = match decode_update_payload(payload, self.input_schema.as_ref()) {
196 Ok(expr) => expr,
197 Err(error) => {
198 warn!(error; "Failed to decode remote dynamic filter update payload");
199 return RemoteDynFilterUpdateOutcome::DecodeFailed;
200 }
201 };
202
203 if let Err(error) = self.filter.update(expr) {
204 warn!(error; "Failed to apply remote dynamic filter update");
205 return RemoteDynFilterUpdateOutcome::DecodeFailed;
206 }
207
208 epoch.generation = Some(generation);
209 if is_complete {
210 self.filter.mark_complete();
211 epoch.is_complete = true;
212 }
213
214 RemoteDynFilterUpdateOutcome::Applied
215 }
216}
217
218#[derive(Debug)]
219pub(super) struct RegisteredDynFilter {
220 pub(super) filter_id: RemoteDynFilterId,
221 pub(super) child_exprs_datafusion_proto: Vec<Vec<u8>>,
222 pub(super) subscriber_regions: HashSet<RegionId>,
223 runtime: Option<Arc<RemoteDynFilterState>>,
224 pending_update: Option<PendingDynFilterUpdate>,
225}
226
227impl RegisteredDynFilter {
228 fn new(
229 filter_id: RemoteDynFilterId,
230 child_exprs_datafusion_proto: Vec<Vec<u8>>,
231 pending_update: Option<PendingDynFilterUpdate>,
232 region_id: RegionId,
233 ) -> Self {
234 let mut subscriber_regions = HashSet::new();
235 subscriber_regions.insert(region_id);
236
237 Self {
238 filter_id,
239 child_exprs_datafusion_proto,
240 subscriber_regions,
241 runtime: None,
242 pending_update,
243 }
244 }
245
246 fn apply_initial_snapshot(
247 &mut self,
248 reg: &InitialDynFilterReg,
249 ) -> RemoteDynFilterUpdateOutcome {
250 let Some(snapshot) = PendingDynFilterUpdate::from_initial_reg(reg) else {
251 return RemoteDynFilterUpdateOutcome::Idempotent;
252 };
253
254 self.apply_or_buffer_update(&snapshot.payload, snapshot.generation, snapshot.is_complete)
255 }
256
257 fn register_subscriber(&mut self, region_id: RegionId) -> bool {
258 if !self.subscriber_regions.insert(region_id) {
259 warn!(
260 "Duplicate remote dynamic filter subscriber region, filter_id: {}, region_id: {}",
261 self.filter_id, region_id
262 );
263 return false;
264 }
265
266 true
267 }
268
269 fn has_subscribers(&self) -> bool {
270 !self.subscriber_regions.is_empty()
271 }
272
273 fn should_drop_after_remove(&mut self, region_id: RegionId) -> bool {
274 self.subscriber_regions.remove(®ion_id);
275 !self.has_subscribers()
276 }
277
278 fn buffer_update(
279 &mut self,
280 payload: &[u8],
281 generation: u64,
282 is_complete: bool,
283 ) -> RemoteDynFilterUpdateOutcome {
284 if !validate_update_payload_size(payload) {
285 return RemoteDynFilterUpdateOutcome::PayloadTooLarge;
286 }
287
288 if let Some(pending) = self.pending_update.as_mut() {
289 if generation < pending.generation {
290 return RemoteDynFilterUpdateOutcome::Stale;
291 }
292
293 if generation == pending.generation {
294 pending.is_complete |= is_complete;
295 return RemoteDynFilterUpdateOutcome::Idempotent;
296 }
297
298 if pending.is_complete {
299 return RemoteDynFilterUpdateOutcome::AlreadyComplete;
300 }
301 }
302
303 self.pending_update = Some(PendingDynFilterUpdate {
304 payload: payload.to_vec(),
305 generation,
306 is_complete,
307 });
308 RemoteDynFilterUpdateOutcome::Buffered
309 }
310
311 fn apply_or_buffer_update(
312 &mut self,
313 payload: &[u8],
314 generation: u64,
315 is_complete: bool,
316 ) -> RemoteDynFilterUpdateOutcome {
317 if let Some(runtime) = &self.runtime {
318 return runtime.apply_update(payload, generation, is_complete);
319 }
320
321 self.buffer_update(payload, generation, is_complete)
322 }
323
324 fn decode_children(
325 &self,
326 input_schema: &Schema,
327 ) -> DataFusionResult<Vec<Arc<dyn PhysicalExpr>>> {
328 InitialDynFilterReg::new(
329 self.filter_id.to_string(),
330 self.child_exprs_datafusion_proto.clone(),
331 )
332 .decode_children(
333 remote_dyn_filter_task_context(),
334 input_schema,
335 REMOTE_DYN_FILTER_PAYLOAD_MAX_BYTES,
336 )
337 }
338
339 fn dyn_filter(&mut self, input_schema: &Schema) -> Option<Arc<dyn PhysicalExpr>> {
340 let children = match self.decode_children(input_schema) {
341 Ok(children) => children,
342 Err(error) => {
343 warn!(error; "Failed to decode remote dynamic filter initial children");
344 return None;
345 }
346 };
347
348 let runtime = match &self.runtime {
349 Some(runtime) => runtime.clone(),
350 None => {
351 let filter = Arc::new(DynamicFilterPhysicalExpr::new(children.clone(), lit(true)));
352 let runtime = Arc::new(RemoteDynFilterState::new(
353 filter,
354 Arc::new(input_schema.clone()),
355 ));
356 if let Some(pending) = self.pending_update.take() {
357 let outcome = runtime.apply_update(
358 &pending.payload,
359 pending.generation,
360 pending.is_complete,
361 );
362 if matches!(outcome, RemoteDynFilterUpdateOutcome::DecodeFailed) {
363 warn!(
364 "Dropped buffered remote dynamic filter update after decode failure, filter_id: {}, generation: {}",
365 self.filter_id, pending.generation
366 );
367 }
368 }
369 self.runtime = Some(runtime.clone());
370 runtime
371 }
372 };
373
374 match runtime.filter().with_new_children(children) {
375 Ok(expr) => Some(expr),
376 Err(error) => {
377 warn!(error; "Failed to remap remote dynamic filter children for scan");
378 None
379 }
380 }
381 }
382
383 fn deactivate(&self) {
384 if let Some(runtime) = &self.runtime {
385 runtime.filter.mark_complete();
386 }
387 }
388}
389
390impl Drop for RegisteredDynFilter {
391 fn drop(&mut self) {
392 self.deactivate();
393 }
394}
395
396pub(super) fn initial_dyn_filter_regs_from_query_ctx(
397 query_ctx: &QueryContextRef,
398) -> Option<InitialDynFilterRegs> {
399 let registrations =
400 query_ctx.extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY)?;
401 match InitialDynFilterRegs::from_extension_value(registrations) {
402 Ok(registrations) => match registrations.validate_default_bounds() {
403 Ok(()) => Some(registrations),
404 Err(error) => {
405 warn!(error; "Initial remote dyn filter registrations exceeded Task 03 bounds");
406 None
407 }
408 },
409 Err(error) => {
410 warn!(error; "Failed to decode initial remote dyn filter registrations from query context");
411 None
412 }
413 }
414}
415
416pub(super) fn register_initial_dyn_filter_regs(
417 regs_by_query: &RemoteDynFilterRegistry,
418 query_id: &QueryId,
419 region_id: RegionId,
420 regs: &InitialDynFilterRegs,
421) -> Vec<RemoteDynFilterId> {
422 if regs.is_empty() {
423 return Vec::new();
424 }
425
426 if let Err(error) = regs.validate_default_bounds() {
427 warn!(error; "Ignored invalid initial dyn filter registrations for query_id {}", query_id);
428 return Vec::new();
429 }
430
431 let query_regs = regs_by_query.get_or_insert_query(*query_id);
432 let mut query_regs = query_regs.lock().unwrap();
433 let mut registered_filter_ids = Vec::with_capacity(regs.regs.len());
434
435 for reg in ®s.regs {
436 let filter_id = RemoteDynFilterId::new(reg.filter_id.clone());
437 match query_regs.entry(filter_id.clone()) {
438 Entry::Occupied(mut entry) => {
439 let registered = entry.get_mut();
440 if registered.child_exprs_datafusion_proto != reg.child_exprs_datafusion_proto {
441 warn!(
442 "Remote dynamic filter registration reused filter_id with different children, query_id: {}, filter_id: {}, region_id: {}",
443 query_id, filter_id, region_id
444 );
445 }
446 if registered.register_subscriber(region_id) {
447 registered_filter_ids.push(filter_id);
448 }
449 let _ = registered.apply_initial_snapshot(reg);
450 }
451 Entry::Vacant(entry) => {
452 entry.insert(RegisteredDynFilter::new(
453 filter_id.clone(),
454 reg.child_exprs_datafusion_proto.clone(),
455 PendingDynFilterUpdate::from_initial_reg(reg),
456 region_id,
457 ));
458 registered_filter_ids.push(filter_id);
459 }
460 }
461 }
462
463 registered_filter_ids
464}
465
466pub(super) fn remote_dyn_filter_exprs_for_initial_regs(
467 regs_by_query: &RemoteDynFilterRegistry,
468 query_id: &QueryId,
469 initial_regs: &InitialDynFilterRegs,
470 input_schema: &Schema,
471) -> Vec<Arc<dyn PhysicalExpr>> {
472 let Some(query_regs) = regs_by_query.get_query(query_id) else {
473 return Vec::new();
474 };
475
476 let mut query_regs = query_regs.lock().unwrap();
477 initial_regs
478 .regs
479 .iter()
480 .filter_map(|reg| {
481 let filter_id = RemoteDynFilterId::new(reg.filter_id.clone());
482 let registered = query_regs.get_mut(&filter_id)?;
483 registered.dyn_filter(input_schema)
484 })
485 .collect()
486}
487
488pub(super) fn apply_remote_dyn_filter_update(
489 regs_by_query: &RemoteDynFilterRegistry,
490 query_id: &QueryId,
491 filter_id: &RemoteDynFilterId,
492 payload: &[u8],
493 generation: u64,
494 is_complete: bool,
495) -> RemoteDynFilterUpdateOutcome {
496 if !validate_update_payload_size(payload) {
497 warn!(
498 "Ignored oversized remote dynamic filter update, query_id: {}, filter_id: {}, payload_size: {}, max_payload_size: {}",
499 query_id,
500 filter_id,
501 payload.len(),
502 REMOTE_DYN_FILTER_PAYLOAD_MAX_BYTES
503 );
504 return RemoteDynFilterUpdateOutcome::PayloadTooLarge;
505 }
506
507 let Some(query_regs) = regs_by_query.get_query(query_id) else {
508 warn!(
509 "Ignored remote dynamic filter update without query registration, query_id: {}, filter_id: {}",
510 query_id, filter_id
511 );
512 return RemoteDynFilterUpdateOutcome::MissingRegistration;
513 };
514
515 let mut query_regs = query_regs.lock().unwrap();
516 let Some(registered) = query_regs.get_mut(filter_id) else {
517 warn!(
518 "Ignored remote dynamic filter update without filter registration, query_id: {}, filter_id: {}",
519 query_id, filter_id
520 );
521 return RemoteDynFilterUpdateOutcome::MissingRegistration;
522 };
523
524 registered.apply_or_buffer_update(payload, generation, is_complete)
525}
526
527pub(super) fn unregister_remote_dyn_filter(
528 regs_by_query: &RemoteDynFilterRegistry,
529 query_id: &QueryId,
530 filter_id: &RemoteDynFilterId,
531) -> RemoteDynFilterUpdateOutcome {
532 let Some(query_regs) = regs_by_query.get_query(query_id) else {
533 warn!(
534 "Ignored remote dynamic filter unregister without query registration, query_id: {}, filter_id: {}",
535 query_id, filter_id
536 );
537 return RemoteDynFilterUpdateOutcome::MissingRegistration;
538 };
539
540 let (registered, should_remove_query) = {
541 let mut locked = query_regs.lock().unwrap();
542 let Some(registered) = locked.remove(filter_id) else {
543 warn!(
544 "Ignored remote dynamic filter unregister without filter registration, query_id: {}, filter_id: {}",
545 query_id, filter_id
546 );
547 return RemoteDynFilterUpdateOutcome::MissingRegistration;
548 };
549 let should_remove_query = locked.is_empty();
550 (registered, should_remove_query)
551 };
552
553 drop(registered);
554 if should_remove_query {
555 regs_by_query.remove_query_if_empty(query_id, &query_regs);
556 }
557
558 RemoteDynFilterUpdateOutcome::Applied
559}
560
561pub(super) fn remove_initial_dyn_filter_regs(
562 regs_by_query: &RemoteDynFilterRegistry,
563 query_id: &QueryId,
564 region_id: RegionId,
565 filter_ids: &[RemoteDynFilterId],
566) {
567 if filter_ids.is_empty() {
568 return;
569 }
570
571 let Some(query_regs) = regs_by_query.get_query(query_id) else {
572 return;
573 };
574
575 let (removed_filters, should_remove_query) = {
576 let mut locked = query_regs.lock().unwrap();
577 let mut removed_filters = Vec::new();
578
579 for filter_id in filter_ids {
580 let should_remove_filter = locked
581 .get_mut(filter_id)
582 .map(|registered| registered.should_drop_after_remove(region_id))
583 .unwrap_or(false);
584
585 if should_remove_filter && let Some(registered) = locked.remove(filter_id) {
586 removed_filters.push(registered);
587 }
588 }
589
590 let should_remove_query = locked.is_empty();
591 (removed_filters, should_remove_query)
592 };
593
594 drop(removed_filters);
595 if should_remove_query {
596 regs_by_query.remove_query_if_empty(query_id, &query_regs);
597 }
598}
599
600fn decode_update_payload(
601 payload: &[u8],
602 input_schema: &Schema,
603) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
604 DynFilterPayload::Datafusion(payload.to_vec()).decode_datafusion_expr(
605 remote_dyn_filter_task_context(),
606 input_schema,
607 REMOTE_DYN_FILTER_PAYLOAD_MAX_BYTES,
608 )
609}
610
611fn remote_dyn_filter_task_context() -> &'static TaskContext {
612 static TASK_CONTEXT: OnceLock<TaskContext> = OnceLock::new();
613
614 TASK_CONTEXT.get_or_init(|| {
615 let session_state = SessionStateBuilder::new().with_default_features().build();
620 TaskContext::from(&session_state)
621 })
622}
623
624fn validate_update_payload_size(payload: &[u8]) -> bool {
625 payload.len() <= REMOTE_DYN_FILTER_PAYLOAD_MAX_BYTES
626}
627
628#[cfg(test)]
629mod tests {
630 use super::*;
631
632 #[test]
633 fn remote_dyn_filter_same_query_reuses_one_inner_lock() {
634 let regs_by_query = RemoteDynFilterRegistry::new();
635 let query_id = QueryId::new();
636
637 let first = regs_by_query.get_or_insert_query(query_id);
638 let second = regs_by_query.get_or_insert_query(query_id);
639
640 assert!(Arc::ptr_eq(&first, &second));
641 }
642
643 #[test]
644 fn remote_dyn_filter_stale_query_remove_does_not_remove_new_query_state() {
645 let regs_by_query = RemoteDynFilterRegistry::new();
646 let query_id = QueryId::new();
647 let stale_query_regs = regs_by_query.get_or_insert_query(query_id);
648
649 regs_by_query.remove_query_if_empty(&query_id, &stale_query_regs);
650 assert!(regs_by_query.get_query(&query_id).is_none());
651
652 let new_query_regs = regs_by_query.get_or_insert_query(query_id);
653 let filter_id = RemoteDynFilterId::new("filter-1");
654 let region_id = RegionId::new(1024, 7);
655 new_query_regs.lock().unwrap().insert(
656 filter_id.clone(),
657 RegisteredDynFilter::new(filter_id.clone(), vec![], None, region_id),
658 );
659
660 regs_by_query.remove_query_if_empty(&query_id, &stale_query_regs);
661
662 let current_query_regs = regs_by_query.get_query(&query_id).unwrap();
663 assert!(Arc::ptr_eq(¤t_query_regs, &new_query_regs));
664 assert_eq!(current_query_regs.lock().unwrap().len(), 1);
665 }
666
667 #[test]
668 fn remote_dyn_filter_register_uses_entry_to_merge_same_filter_subscribers() {
669 let regs_by_query = RemoteDynFilterRegistry::new();
670 let query_id = QueryId::new();
671 let first_region_id = RegionId::new(1024, 7);
672 let second_region_id = RegionId::new(1024, 8);
673 let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new("filter-1", vec![])]);
674
675 register_initial_dyn_filter_regs(®s_by_query, &query_id, first_region_id, ®s);
676 register_initial_dyn_filter_regs(®s_by_query, &query_id, second_region_id, ®s);
677
678 let query_regs = regs_by_query.get_query(&query_id).unwrap();
679 let query_regs = query_regs.lock().unwrap();
680 assert_eq!(query_regs.len(), 1);
681 let registered = query_regs.get(&RemoteDynFilterId::new("filter-1")).unwrap();
682 assert_eq!(registered.subscriber_regions.len(), 2);
683 assert!(registered.subscriber_regions.contains(&first_region_id));
684 assert!(registered.subscriber_regions.contains(&second_region_id));
685 }
686}