Skip to main content

datanode/region_server/
registrations.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18use std::sync::{Arc, Mutex, OnceLock};
19
20use common_query::request::{
21    DynFilterPayload, INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, InitialDynFilterReg,
22    InitialDynFilterRegs,
23};
24use common_telemetry::warn;
25use dashmap::DashMap;
26use datafusion::arrow::datatypes::{Schema, SchemaRef};
27use datafusion::execution::{SessionStateBuilder, TaskContext};
28use datafusion::physical_plan::PhysicalExpr;
29use datafusion::physical_plan::expressions::{DynamicFilterPhysicalExpr, lit};
30use datafusion_common::Result as DataFusionResult;
31use session::context::QueryContextRef;
32use session::query_id::QueryId;
33use store_api::storage::RegionId;
34
35pub(super) const REMOTE_DYN_FILTER_PAYLOAD_MAX_BYTES: usize = 64 * 1024;
36
37type QueryRemoteDynFilterRegs = HashMap<RemoteDynFilterId, RegisteredDynFilter>;
38
39#[derive(Debug, Default)]
40pub(super) struct RemoteDynFilterRegistry {
41    // Keep cross-query concurrency while making each query's RDF state machine a
42    // single critical section. RDF count per query is small
43    queries: DashMap<QueryId, Arc<Mutex<QueryRemoteDynFilterRegs>>>,
44}
45
46impl RemoteDynFilterRegistry {
47    pub(super) fn new() -> Self {
48        Self::default()
49    }
50
51    fn get_or_insert_query(&self, query_id: QueryId) -> Arc<Mutex<QueryRemoteDynFilterRegs>> {
52        self.queries
53            .entry(query_id)
54            .or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
55            .clone()
56    }
57
58    fn get_query(&self, query_id: &QueryId) -> Option<Arc<Mutex<QueryRemoteDynFilterRegs>>> {
59        self.queries
60            .get(query_id)
61            .map(|query_regs| query_regs.clone())
62    }
63
64    fn remove_query_if_empty(
65        &self,
66        query_id: &QueryId,
67        expected: &Arc<Mutex<QueryRemoteDynFilterRegs>>,
68    ) {
69        // Protect against stale cleanup of an old per-query state: remove the
70        // outer entry only if it still points to the same inner mutex and that
71        // inner map is still empty.
72        self.queries.remove_if(query_id, |_, query_regs| {
73            Arc::ptr_eq(query_regs, expected) && query_regs.lock().unwrap().is_empty()
74        });
75    }
76
77    #[cfg(test)]
78    pub(super) fn inspect_query<R>(
79        &self,
80        query_id: &QueryId,
81        inspect: impl FnOnce(&QueryRemoteDynFilterRegs) -> R,
82    ) -> Option<R> {
83        self.get_query(query_id)
84            .map(|query_regs| inspect(&query_regs.lock().unwrap()))
85    }
86}
87
88#[derive(Clone, Debug, PartialEq, Eq, Hash)]
89pub(super) struct RemoteDynFilterId(String);
90
91impl RemoteDynFilterId {
92    pub(super) fn new(value: impl Into<String>) -> Self {
93        Self(value.into())
94    }
95}
96
97impl Display for RemoteDynFilterId {
98    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
99        self.0.fmt(f)
100    }
101}
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub(super) enum RemoteDynFilterUpdateOutcome {
105    MissingRegistration,
106    Buffered,
107    Applied,
108    Idempotent,
109    Stale,
110    AlreadyComplete,
111    PayloadTooLarge,
112    DecodeFailed,
113}
114
115#[derive(Debug, Clone)]
116struct PendingDynFilterUpdate {
117    payload: Vec<u8>,
118    generation: u64,
119    is_complete: bool,
120}
121
122impl PendingDynFilterUpdate {
123    fn from_initial_reg(reg: &InitialDynFilterReg) -> Option<Self> {
124        let snapshot = reg.initial_snapshot.as_ref()?;
125        match &snapshot.payload {
126            DynFilterPayload::Datafusion(payload) => Some(Self {
127                payload: payload.clone(),
128                generation: snapshot.generation,
129                is_complete: snapshot.is_complete,
130            }),
131            _ => None,
132        }
133    }
134}
135
136#[derive(Debug)]
137struct RemoteDynFilterEpochState {
138    generation: Option<u64>,
139    is_complete: bool,
140}
141
142#[derive(Debug)]
143struct RemoteDynFilterState {
144    filter: Arc<DynamicFilterPhysicalExpr>,
145    input_schema: SchemaRef,
146    epoch: Mutex<RemoteDynFilterEpochState>,
147}
148
149impl RemoteDynFilterState {
150    fn new(filter: Arc<DynamicFilterPhysicalExpr>, input_schema: SchemaRef) -> Self {
151        Self {
152            filter,
153            input_schema,
154            epoch: Mutex::new(RemoteDynFilterEpochState {
155                generation: None,
156                is_complete: false,
157            }),
158        }
159    }
160
161    fn filter(&self) -> Arc<DynamicFilterPhysicalExpr> {
162        self.filter.clone()
163    }
164
165    fn apply_update(
166        &self,
167        payload: &[u8],
168        generation: u64,
169        is_complete: bool,
170    ) -> RemoteDynFilterUpdateOutcome {
171        if !validate_update_payload_size(payload) {
172            return RemoteDynFilterUpdateOutcome::PayloadTooLarge;
173        }
174
175        let mut epoch = self.epoch.lock().unwrap();
176        if let Some(current_generation) = epoch.generation {
177            if generation < current_generation {
178                return RemoteDynFilterUpdateOutcome::Stale;
179            }
180
181            if generation == current_generation {
182                if is_complete && !epoch.is_complete {
183                    self.filter.mark_complete();
184                    epoch.is_complete = true;
185                    return RemoteDynFilterUpdateOutcome::Applied;
186                }
187                return RemoteDynFilterUpdateOutcome::Idempotent;
188            }
189        }
190
191        if epoch.is_complete {
192            return RemoteDynFilterUpdateOutcome::AlreadyComplete;
193        }
194
195        let expr = match decode_update_payload(payload, self.input_schema.as_ref()) {
196            Ok(expr) => expr,
197            Err(error) => {
198                warn!(error; "Failed to decode remote dynamic filter update payload");
199                return RemoteDynFilterUpdateOutcome::DecodeFailed;
200            }
201        };
202
203        if let Err(error) = self.filter.update(expr) {
204            warn!(error; "Failed to apply remote dynamic filter update");
205            return RemoteDynFilterUpdateOutcome::DecodeFailed;
206        }
207
208        epoch.generation = Some(generation);
209        if is_complete {
210            self.filter.mark_complete();
211            epoch.is_complete = true;
212        }
213
214        RemoteDynFilterUpdateOutcome::Applied
215    }
216}
217
218#[derive(Debug)]
219pub(super) struct RegisteredDynFilter {
220    pub(super) filter_id: RemoteDynFilterId,
221    pub(super) child_exprs_datafusion_proto: Vec<Vec<u8>>,
222    pub(super) subscriber_regions: HashSet<RegionId>,
223    runtime: Option<Arc<RemoteDynFilterState>>,
224    pending_update: Option<PendingDynFilterUpdate>,
225}
226
227impl RegisteredDynFilter {
228    fn new(
229        filter_id: RemoteDynFilterId,
230        child_exprs_datafusion_proto: Vec<Vec<u8>>,
231        pending_update: Option<PendingDynFilterUpdate>,
232        region_id: RegionId,
233    ) -> Self {
234        let mut subscriber_regions = HashSet::new();
235        subscriber_regions.insert(region_id);
236
237        Self {
238            filter_id,
239            child_exprs_datafusion_proto,
240            subscriber_regions,
241            runtime: None,
242            pending_update,
243        }
244    }
245
246    fn apply_initial_snapshot(
247        &mut self,
248        reg: &InitialDynFilterReg,
249    ) -> RemoteDynFilterUpdateOutcome {
250        let Some(snapshot) = PendingDynFilterUpdate::from_initial_reg(reg) else {
251            return RemoteDynFilterUpdateOutcome::Idempotent;
252        };
253
254        self.apply_or_buffer_update(&snapshot.payload, snapshot.generation, snapshot.is_complete)
255    }
256
257    fn register_subscriber(&mut self, region_id: RegionId) -> bool {
258        if !self.subscriber_regions.insert(region_id) {
259            warn!(
260                "Duplicate remote dynamic filter subscriber region, filter_id: {}, region_id: {}",
261                self.filter_id, region_id
262            );
263            return false;
264        }
265
266        true
267    }
268
269    fn has_subscribers(&self) -> bool {
270        !self.subscriber_regions.is_empty()
271    }
272
273    fn should_drop_after_remove(&mut self, region_id: RegionId) -> bool {
274        self.subscriber_regions.remove(&region_id);
275        !self.has_subscribers()
276    }
277
278    fn buffer_update(
279        &mut self,
280        payload: &[u8],
281        generation: u64,
282        is_complete: bool,
283    ) -> RemoteDynFilterUpdateOutcome {
284        if !validate_update_payload_size(payload) {
285            return RemoteDynFilterUpdateOutcome::PayloadTooLarge;
286        }
287
288        if let Some(pending) = self.pending_update.as_mut() {
289            if generation < pending.generation {
290                return RemoteDynFilterUpdateOutcome::Stale;
291            }
292
293            if generation == pending.generation {
294                pending.is_complete |= is_complete;
295                return RemoteDynFilterUpdateOutcome::Idempotent;
296            }
297
298            if pending.is_complete {
299                return RemoteDynFilterUpdateOutcome::AlreadyComplete;
300            }
301        }
302
303        self.pending_update = Some(PendingDynFilterUpdate {
304            payload: payload.to_vec(),
305            generation,
306            is_complete,
307        });
308        RemoteDynFilterUpdateOutcome::Buffered
309    }
310
311    fn apply_or_buffer_update(
312        &mut self,
313        payload: &[u8],
314        generation: u64,
315        is_complete: bool,
316    ) -> RemoteDynFilterUpdateOutcome {
317        if let Some(runtime) = &self.runtime {
318            return runtime.apply_update(payload, generation, is_complete);
319        }
320
321        self.buffer_update(payload, generation, is_complete)
322    }
323
324    fn decode_children(
325        &self,
326        input_schema: &Schema,
327    ) -> DataFusionResult<Vec<Arc<dyn PhysicalExpr>>> {
328        InitialDynFilterReg::new(
329            self.filter_id.to_string(),
330            self.child_exprs_datafusion_proto.clone(),
331        )
332        .decode_children(
333            remote_dyn_filter_task_context(),
334            input_schema,
335            REMOTE_DYN_FILTER_PAYLOAD_MAX_BYTES,
336        )
337    }
338
339    fn dyn_filter(&mut self, input_schema: &Schema) -> Option<Arc<dyn PhysicalExpr>> {
340        let children = match self.decode_children(input_schema) {
341            Ok(children) => children,
342            Err(error) => {
343                warn!(error; "Failed to decode remote dynamic filter initial children");
344                return None;
345            }
346        };
347
348        let runtime = match &self.runtime {
349            Some(runtime) => runtime.clone(),
350            None => {
351                let filter = Arc::new(DynamicFilterPhysicalExpr::new(children.clone(), lit(true)));
352                let runtime = Arc::new(RemoteDynFilterState::new(
353                    filter,
354                    Arc::new(input_schema.clone()),
355                ));
356                if let Some(pending) = self.pending_update.take() {
357                    let outcome = runtime.apply_update(
358                        &pending.payload,
359                        pending.generation,
360                        pending.is_complete,
361                    );
362                    if matches!(outcome, RemoteDynFilterUpdateOutcome::DecodeFailed) {
363                        warn!(
364                            "Dropped buffered remote dynamic filter update after decode failure, filter_id: {}, generation: {}",
365                            self.filter_id, pending.generation
366                        );
367                    }
368                }
369                self.runtime = Some(runtime.clone());
370                runtime
371            }
372        };
373
374        match runtime.filter().with_new_children(children) {
375            Ok(expr) => Some(expr),
376            Err(error) => {
377                warn!(error; "Failed to remap remote dynamic filter children for scan");
378                None
379            }
380        }
381    }
382
383    fn deactivate(&self) {
384        if let Some(runtime) = &self.runtime {
385            runtime.filter.mark_complete();
386        }
387    }
388}
389
390impl Drop for RegisteredDynFilter {
391    fn drop(&mut self) {
392        self.deactivate();
393    }
394}
395
396pub(super) fn initial_dyn_filter_regs_from_query_ctx(
397    query_ctx: &QueryContextRef,
398) -> Option<InitialDynFilterRegs> {
399    let registrations =
400        query_ctx.extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY)?;
401    match InitialDynFilterRegs::from_extension_value(registrations) {
402        Ok(registrations) => match registrations.validate_default_bounds() {
403            Ok(()) => Some(registrations),
404            Err(error) => {
405                warn!(error; "Initial remote dyn filter registrations exceeded Task 03 bounds");
406                None
407            }
408        },
409        Err(error) => {
410            warn!(error; "Failed to decode initial remote dyn filter registrations from query context");
411            None
412        }
413    }
414}
415
416pub(super) fn register_initial_dyn_filter_regs(
417    regs_by_query: &RemoteDynFilterRegistry,
418    query_id: &QueryId,
419    region_id: RegionId,
420    regs: &InitialDynFilterRegs,
421) -> Vec<RemoteDynFilterId> {
422    if regs.is_empty() {
423        return Vec::new();
424    }
425
426    if let Err(error) = regs.validate_default_bounds() {
427        warn!(error; "Ignored invalid initial dyn filter registrations for query_id {}", query_id);
428        return Vec::new();
429    }
430
431    let query_regs = regs_by_query.get_or_insert_query(*query_id);
432    let mut query_regs = query_regs.lock().unwrap();
433    let mut registered_filter_ids = Vec::with_capacity(regs.regs.len());
434
435    for reg in &regs.regs {
436        let filter_id = RemoteDynFilterId::new(reg.filter_id.clone());
437        match query_regs.entry(filter_id.clone()) {
438            Entry::Occupied(mut entry) => {
439                let registered = entry.get_mut();
440                if registered.child_exprs_datafusion_proto != reg.child_exprs_datafusion_proto {
441                    warn!(
442                        "Remote dynamic filter registration reused filter_id with different children, query_id: {}, filter_id: {}, region_id: {}",
443                        query_id, filter_id, region_id
444                    );
445                }
446                if registered.register_subscriber(region_id) {
447                    registered_filter_ids.push(filter_id);
448                }
449                let _ = registered.apply_initial_snapshot(reg);
450            }
451            Entry::Vacant(entry) => {
452                entry.insert(RegisteredDynFilter::new(
453                    filter_id.clone(),
454                    reg.child_exprs_datafusion_proto.clone(),
455                    PendingDynFilterUpdate::from_initial_reg(reg),
456                    region_id,
457                ));
458                registered_filter_ids.push(filter_id);
459            }
460        }
461    }
462
463    registered_filter_ids
464}
465
466pub(super) fn remote_dyn_filter_exprs_for_initial_regs(
467    regs_by_query: &RemoteDynFilterRegistry,
468    query_id: &QueryId,
469    initial_regs: &InitialDynFilterRegs,
470    input_schema: &Schema,
471) -> Vec<Arc<dyn PhysicalExpr>> {
472    let Some(query_regs) = regs_by_query.get_query(query_id) else {
473        return Vec::new();
474    };
475
476    let mut query_regs = query_regs.lock().unwrap();
477    initial_regs
478        .regs
479        .iter()
480        .filter_map(|reg| {
481            let filter_id = RemoteDynFilterId::new(reg.filter_id.clone());
482            let registered = query_regs.get_mut(&filter_id)?;
483            registered.dyn_filter(input_schema)
484        })
485        .collect()
486}
487
488pub(super) fn apply_remote_dyn_filter_update(
489    regs_by_query: &RemoteDynFilterRegistry,
490    query_id: &QueryId,
491    filter_id: &RemoteDynFilterId,
492    payload: &[u8],
493    generation: u64,
494    is_complete: bool,
495) -> RemoteDynFilterUpdateOutcome {
496    if !validate_update_payload_size(payload) {
497        warn!(
498            "Ignored oversized remote dynamic filter update, query_id: {}, filter_id: {}, payload_size: {}, max_payload_size: {}",
499            query_id,
500            filter_id,
501            payload.len(),
502            REMOTE_DYN_FILTER_PAYLOAD_MAX_BYTES
503        );
504        return RemoteDynFilterUpdateOutcome::PayloadTooLarge;
505    }
506
507    let Some(query_regs) = regs_by_query.get_query(query_id) else {
508        warn!(
509            "Ignored remote dynamic filter update without query registration, query_id: {}, filter_id: {}",
510            query_id, filter_id
511        );
512        return RemoteDynFilterUpdateOutcome::MissingRegistration;
513    };
514
515    let mut query_regs = query_regs.lock().unwrap();
516    let Some(registered) = query_regs.get_mut(filter_id) else {
517        warn!(
518            "Ignored remote dynamic filter update without filter registration, query_id: {}, filter_id: {}",
519            query_id, filter_id
520        );
521        return RemoteDynFilterUpdateOutcome::MissingRegistration;
522    };
523
524    registered.apply_or_buffer_update(payload, generation, is_complete)
525}
526
527pub(super) fn unregister_remote_dyn_filter(
528    regs_by_query: &RemoteDynFilterRegistry,
529    query_id: &QueryId,
530    filter_id: &RemoteDynFilterId,
531) -> RemoteDynFilterUpdateOutcome {
532    let Some(query_regs) = regs_by_query.get_query(query_id) else {
533        warn!(
534            "Ignored remote dynamic filter unregister without query registration, query_id: {}, filter_id: {}",
535            query_id, filter_id
536        );
537        return RemoteDynFilterUpdateOutcome::MissingRegistration;
538    };
539
540    let (registered, should_remove_query) = {
541        let mut locked = query_regs.lock().unwrap();
542        let Some(registered) = locked.remove(filter_id) else {
543            warn!(
544                "Ignored remote dynamic filter unregister without filter registration, query_id: {}, filter_id: {}",
545                query_id, filter_id
546            );
547            return RemoteDynFilterUpdateOutcome::MissingRegistration;
548        };
549        let should_remove_query = locked.is_empty();
550        (registered, should_remove_query)
551    };
552
553    drop(registered);
554    if should_remove_query {
555        regs_by_query.remove_query_if_empty(query_id, &query_regs);
556    }
557
558    RemoteDynFilterUpdateOutcome::Applied
559}
560
561pub(super) fn remove_initial_dyn_filter_regs(
562    regs_by_query: &RemoteDynFilterRegistry,
563    query_id: &QueryId,
564    region_id: RegionId,
565    filter_ids: &[RemoteDynFilterId],
566) {
567    if filter_ids.is_empty() {
568        return;
569    }
570
571    let Some(query_regs) = regs_by_query.get_query(query_id) else {
572        return;
573    };
574
575    let (removed_filters, should_remove_query) = {
576        let mut locked = query_regs.lock().unwrap();
577        let mut removed_filters = Vec::new();
578
579        for filter_id in filter_ids {
580            let should_remove_filter = locked
581                .get_mut(filter_id)
582                .map(|registered| registered.should_drop_after_remove(region_id))
583                .unwrap_or(false);
584
585            if should_remove_filter && let Some(registered) = locked.remove(filter_id) {
586                removed_filters.push(registered);
587            }
588        }
589
590        let should_remove_query = locked.is_empty();
591        (removed_filters, should_remove_query)
592    };
593
594    drop(removed_filters);
595    if should_remove_query {
596        regs_by_query.remove_query_if_empty(query_id, &query_regs);
597    }
598}
599
600fn decode_update_payload(
601    payload: &[u8],
602    input_schema: &Schema,
603) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
604    DynFilterPayload::Datafusion(payload.to_vec()).decode_datafusion_expr(
605        remote_dyn_filter_task_context(),
606        input_schema,
607        REMOTE_DYN_FILTER_PAYLOAD_MAX_BYTES,
608    )
609}
610
611fn remote_dyn_filter_task_context() -> &'static TaskContext {
612    static TASK_CONTEXT: OnceLock<TaskContext> = OnceLock::new();
613
614    TASK_CONTEXT.get_or_init(|| {
615        // RDF payloads can contain DataFusion built-in scalar functions. For
616        // example, multi-column join dynamic filters use `struct(...) IN (...)`.
617        // `TaskContext::default()` has an empty function registry and cannot
618        // decode those expressions.
619        let session_state = SessionStateBuilder::new().with_default_features().build();
620        TaskContext::from(&session_state)
621    })
622}
623
624fn validate_update_payload_size(payload: &[u8]) -> bool {
625    payload.len() <= REMOTE_DYN_FILTER_PAYLOAD_MAX_BYTES
626}
627
628#[cfg(test)]
629mod tests {
630    use super::*;
631
632    #[test]
633    fn remote_dyn_filter_same_query_reuses_one_inner_lock() {
634        let regs_by_query = RemoteDynFilterRegistry::new();
635        let query_id = QueryId::new();
636
637        let first = regs_by_query.get_or_insert_query(query_id);
638        let second = regs_by_query.get_or_insert_query(query_id);
639
640        assert!(Arc::ptr_eq(&first, &second));
641    }
642
643    #[test]
644    fn remote_dyn_filter_stale_query_remove_does_not_remove_new_query_state() {
645        let regs_by_query = RemoteDynFilterRegistry::new();
646        let query_id = QueryId::new();
647        let stale_query_regs = regs_by_query.get_or_insert_query(query_id);
648
649        regs_by_query.remove_query_if_empty(&query_id, &stale_query_regs);
650        assert!(regs_by_query.get_query(&query_id).is_none());
651
652        let new_query_regs = regs_by_query.get_or_insert_query(query_id);
653        let filter_id = RemoteDynFilterId::new("filter-1");
654        let region_id = RegionId::new(1024, 7);
655        new_query_regs.lock().unwrap().insert(
656            filter_id.clone(),
657            RegisteredDynFilter::new(filter_id.clone(), vec![], None, region_id),
658        );
659
660        regs_by_query.remove_query_if_empty(&query_id, &stale_query_regs);
661
662        let current_query_regs = regs_by_query.get_query(&query_id).unwrap();
663        assert!(Arc::ptr_eq(&current_query_regs, &new_query_regs));
664        assert_eq!(current_query_regs.lock().unwrap().len(), 1);
665    }
666
667    #[test]
668    fn remote_dyn_filter_register_uses_entry_to_merge_same_filter_subscribers() {
669        let regs_by_query = RemoteDynFilterRegistry::new();
670        let query_id = QueryId::new();
671        let first_region_id = RegionId::new(1024, 7);
672        let second_region_id = RegionId::new(1024, 8);
673        let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new("filter-1", vec![])]);
674
675        register_initial_dyn_filter_regs(&regs_by_query, &query_id, first_region_id, &regs);
676        register_initial_dyn_filter_regs(&regs_by_query, &query_id, second_region_id, &regs);
677
678        let query_regs = regs_by_query.get_query(&query_id).unwrap();
679        let query_regs = query_regs.lock().unwrap();
680        assert_eq!(query_regs.len(), 1);
681        let registered = query_regs.get(&RemoteDynFilterId::new("filter-1")).unwrap();
682        assert_eq!(registered.subscriber_regions.len(), 2);
683        assert!(registered.subscriber_regions.contains(&first_region_id));
684        assert!(registered.subscriber_regions.contains(&second_region_id));
685    }
686}