Skip to main content

query/dist_plan/
remote_dyn_filter_receiver.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::fmt;
17use std::hash::{Hash, Hasher};
18use std::sync::Arc;
19
20use async_trait::async_trait;
21use datafusion::common::Result;
22use datafusion::execution::context::SessionState;
23use datafusion::physical_expr::utils::conjunction;
24use datafusion::physical_plan::ExecutionPlan;
25use datafusion::physical_plan::expressions::Column;
26use datafusion::physical_plan::filter::FilterExec;
27use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
28use datafusion_common::{DFSchemaRef, DataFusionError};
29use datafusion_expr::{
30    Expr, Extension, LogicalPlan, UserDefinedLogicalNode, UserDefinedLogicalNodeCore,
31};
32use datafusion_physical_expr::PhysicalExpr;
33use datafusion_physical_expr::utils::collect_columns;
34use session::context::QueryContextRef;
35
36type InjectRemoteDynFilterReceiver =
37    dyn Fn(LogicalPlan, QueryContextRef) -> LogicalPlan + Send + Sync + 'static;
38
39/// Injects a logical remote dynamic filter receiver into a query plan.
40pub struct RemoteDynFilterReceiverInjector {
41    inject: Box<InjectRemoteDynFilterReceiver>,
42}
43
44impl RemoteDynFilterReceiverInjector {
45    pub fn new(
46        inject: impl Fn(LogicalPlan, QueryContextRef) -> LogicalPlan + Send + Sync + 'static,
47    ) -> Self {
48        Self {
49            inject: Box::new(inject),
50        }
51    }
52
53    pub fn maybe_inject(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> LogicalPlan {
54        (self.inject)(plan, query_ctx)
55    }
56}
57
58pub type RemoteDynFilterReceiverInjectorRef = Arc<RemoteDynFilterReceiverInjector>;
59
60/// A logical marker that is converted to a [`FilterExec`] carrying remote dynamic filters.
61#[derive(Clone)]
62pub struct RemoteDynFilterReceiverLogicalPlan {
63    input: Arc<LogicalPlan>,
64    dyn_filters: Vec<Arc<dyn PhysicalExpr>>,
65}
66
67impl RemoteDynFilterReceiverLogicalPlan {
68    pub fn new(input: LogicalPlan, dyn_filters: Vec<Arc<dyn PhysicalExpr>>) -> Self {
69        Self {
70            input: Arc::new(input),
71            dyn_filters,
72        }
73    }
74
75    pub fn name() -> &'static str {
76        "RemoteDynFilterReceiver"
77    }
78
79    pub fn into_logical_plan(self) -> LogicalPlan {
80        LogicalPlan::Extension(Extension {
81            node: Arc::new(self),
82        })
83    }
84
85    fn dyn_filters(&self) -> &[Arc<dyn PhysicalExpr>] {
86        &self.dyn_filters
87    }
88
89    fn ord_key(&self) -> String {
90        format!("input={:?}, dyn_filters={:?}", self.input, self.dyn_filters)
91    }
92}
93
94impl fmt::Debug for RemoteDynFilterReceiverLogicalPlan {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        UserDefinedLogicalNodeCore::fmt_for_explain(self, f)
97    }
98}
99
100impl Hash for RemoteDynFilterReceiverLogicalPlan {
101    fn hash<H: Hasher>(&self, state: &mut H) {
102        self.input.hash(state);
103        self.dyn_filters.hash(state);
104    }
105}
106
107impl PartialEq for RemoteDynFilterReceiverLogicalPlan {
108    fn eq(&self, other: &Self) -> bool {
109        self.input == other.input && self.dyn_filters == other.dyn_filters
110    }
111}
112
113impl Eq for RemoteDynFilterReceiverLogicalPlan {}
114
115impl PartialOrd for RemoteDynFilterReceiverLogicalPlan {
116    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
117        Some(self.ord_key().cmp(&other.ord_key()))
118    }
119}
120
121impl UserDefinedLogicalNodeCore for RemoteDynFilterReceiverLogicalPlan {
122    fn name(&self) -> &str {
123        Self::name()
124    }
125
126    fn inputs(&self) -> Vec<&LogicalPlan> {
127        vec![self.input.as_ref()]
128    }
129
130    fn schema(&self) -> &DFSchemaRef {
131        self.input.schema()
132    }
133
134    fn expressions(&self) -> Vec<Expr> {
135        Vec::new()
136    }
137
138    fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
139        let mut required = output_columns.to_vec();
140
141        for filter in &self.dyn_filters {
142            required.extend(
143                collect_columns(filter)
144                    .into_iter()
145                    .map(|column| column.index()),
146            );
147        }
148
149        required.sort_unstable();
150        required.dedup();
151        Some(vec![required])
152    }
153
154    fn fmt_for_explain(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155        write!(f, "{}: filters={}", Self::name(), self.dyn_filters.len())
156    }
157
158    fn with_exprs_and_inputs(
159        &self,
160        _exprs: Vec<Expr>,
161        mut inputs: Vec<LogicalPlan>,
162    ) -> Result<Self> {
163        let input = inputs.pop().ok_or_else(|| {
164            DataFusionError::Internal(format!("Expected exactly one input with {}", Self::name()))
165        })?;
166        let dyn_filters = self
167            .dyn_filters
168            .iter()
169            .map(|filter| remap_physical_expr_columns(filter.clone(), input.schema().as_arrow()))
170            .collect::<Result<Vec<_>>>()?;
171        Ok(Self::new(input, dyn_filters))
172    }
173}
174
175fn remap_physical_expr_columns(
176    expr: Arc<dyn PhysicalExpr>,
177    input_schema: &datafusion::arrow::datatypes::Schema,
178) -> Result<Arc<dyn PhysicalExpr>> {
179    if let Some(column) = expr.as_any().downcast_ref::<Column>() {
180        return Ok(Arc::new(Column::new_with_schema(
181            column.name(),
182            input_schema,
183        )?));
184    }
185
186    let children = expr.children();
187    if children.is_empty() {
188        return Ok(expr);
189    }
190
191    let new_children = children
192        .into_iter()
193        .map(|child| remap_physical_expr_columns(child.clone(), input_schema))
194        .collect::<Result<Vec<_>>>()?;
195    expr.with_new_children(new_children)
196}
197
198pub struct RemoteDynFilterReceiverExtensionPlanner;
199
200#[async_trait]
201impl ExtensionPlanner for RemoteDynFilterReceiverExtensionPlanner {
202    async fn plan_extension(
203        &self,
204        _planner: &dyn PhysicalPlanner,
205        node: &dyn UserDefinedLogicalNode,
206        _logical_inputs: &[&LogicalPlan],
207        physical_inputs: &[Arc<dyn ExecutionPlan>],
208        _session_state: &SessionState,
209    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
210        let Some(receiver) = node
211            .as_any()
212            .downcast_ref::<RemoteDynFilterReceiverLogicalPlan>()
213        else {
214            return Ok(None);
215        };
216
217        let input = physical_inputs.first().cloned().ok_or_else(|| {
218            DataFusionError::Internal(format!("Expected exactly one input with {}", Self::name()))
219        })?;
220        if receiver.dyn_filters().is_empty() {
221            return Ok(Some(input));
222        }
223
224        let predicate = conjunction(receiver.dyn_filters().to_vec());
225        Ok(Some(Arc::new(FilterExec::try_new(predicate, input)?) as _))
226    }
227}
228
229impl RemoteDynFilterReceiverExtensionPlanner {
230    fn name() -> &'static str {
231        RemoteDynFilterReceiverLogicalPlan::name()
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    use std::sync::Arc;
238
239    use datafusion::arrow::datatypes::{DataType, Field, Schema};
240    use datafusion::physical_plan::expressions::{Column, DynamicFilterPhysicalExpr, lit};
241    use datafusion_common::DFSchema;
242    use datafusion_expr::{EmptyRelation, UserDefinedLogicalNodeCore};
243
244    use super::*;
245
246    fn empty_input() -> LogicalPlan {
247        let schema = Arc::new(Schema::new(vec![
248            Field::new(
249                "ts",
250                DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
251                false,
252            ),
253            Field::new("value", DataType::Float64, true),
254            Field::new("instance", DataType::Utf8, true),
255            Field::new("job", DataType::Utf8, true),
256        ]));
257        LogicalPlan::EmptyRelation(EmptyRelation {
258            produce_one_row: false,
259            schema: Arc::new(DFSchema::try_from(schema).unwrap()),
260        })
261    }
262
263    fn pruned_input() -> LogicalPlan {
264        let schema = Arc::new(Schema::new(vec![
265            Field::new(
266                "ts",
267                DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
268                false,
269            ),
270            Field::new("instance", DataType::Utf8, true),
271        ]));
272        LogicalPlan::EmptyRelation(EmptyRelation {
273            produce_one_row: false,
274            schema: Arc::new(DFSchema::try_from(schema).unwrap()),
275        })
276    }
277
278    #[test]
279    fn necessary_children_exprs_keeps_parent_and_filter_columns() {
280        let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(
281            vec![Arc::new(Column::new("ts", 0)) as Arc<_>],
282            lit(true) as _,
283        ));
284        let plan = RemoteDynFilterReceiverLogicalPlan::new(empty_input(), vec![dyn_filter]);
285
286        // Parent only needs `value`, but the receiver must still keep `ts` for
287        // evaluating its dynamic filter after logical projection pruning.
288        let required = UserDefinedLogicalNodeCore::necessary_children_exprs(&plan, &[1]).unwrap();
289        assert_eq!(required, vec![vec![0, 1]]);
290    }
291
292    #[test]
293    fn necessary_children_exprs_is_transparent_without_filters() {
294        let plan = RemoteDynFilterReceiverLogicalPlan::new(empty_input(), vec![]);
295
296        let required =
297            UserDefinedLogicalNodeCore::necessary_children_exprs(&plan, &[1, 3]).unwrap();
298        assert_eq!(required, vec![vec![1, 3]]);
299    }
300
301    #[test]
302    fn with_exprs_and_inputs_remaps_dyn_filter_columns_to_pruned_input() {
303        let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(
304            vec![Arc::new(Column::new("instance", 2)) as Arc<_>],
305            lit(true) as _,
306        ));
307        let plan = RemoteDynFilterReceiverLogicalPlan::new(empty_input(), vec![dyn_filter]);
308
309        let remapped =
310            UserDefinedLogicalNodeCore::with_exprs_and_inputs(&plan, vec![], vec![pruned_input()])
311                .unwrap();
312
313        let columns = collect_columns(&remapped.dyn_filters()[0]);
314        assert_eq!(columns.len(), 1);
315        let column = columns.iter().next().unwrap();
316        assert_eq!(column.name(), "instance");
317        assert_eq!(column.index(), 1);
318    }
319}