1use std::cmp::Ordering;
16use std::fmt;
17use std::hash::{Hash, Hasher};
18use std::sync::Arc;
19
20use async_trait::async_trait;
21use datafusion::common::Result;
22use datafusion::execution::context::SessionState;
23use datafusion::physical_expr::utils::conjunction;
24use datafusion::physical_plan::ExecutionPlan;
25use datafusion::physical_plan::expressions::Column;
26use datafusion::physical_plan::filter::FilterExec;
27use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
28use datafusion_common::{DFSchemaRef, DataFusionError};
29use datafusion_expr::{
30 Expr, Extension, LogicalPlan, UserDefinedLogicalNode, UserDefinedLogicalNodeCore,
31};
32use datafusion_physical_expr::PhysicalExpr;
33use datafusion_physical_expr::utils::collect_columns;
34use session::context::QueryContextRef;
35
36type InjectRemoteDynFilterReceiver =
37 dyn Fn(LogicalPlan, QueryContextRef) -> LogicalPlan + Send + Sync + 'static;
38
39pub struct RemoteDynFilterReceiverInjector {
41 inject: Box<InjectRemoteDynFilterReceiver>,
42}
43
44impl RemoteDynFilterReceiverInjector {
45 pub fn new(
46 inject: impl Fn(LogicalPlan, QueryContextRef) -> LogicalPlan + Send + Sync + 'static,
47 ) -> Self {
48 Self {
49 inject: Box::new(inject),
50 }
51 }
52
53 pub fn maybe_inject(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> LogicalPlan {
54 (self.inject)(plan, query_ctx)
55 }
56}
57
58pub type RemoteDynFilterReceiverInjectorRef = Arc<RemoteDynFilterReceiverInjector>;
59
60#[derive(Clone)]
62pub struct RemoteDynFilterReceiverLogicalPlan {
63 input: Arc<LogicalPlan>,
64 dyn_filters: Vec<Arc<dyn PhysicalExpr>>,
65}
66
67impl RemoteDynFilterReceiverLogicalPlan {
68 pub fn new(input: LogicalPlan, dyn_filters: Vec<Arc<dyn PhysicalExpr>>) -> Self {
69 Self {
70 input: Arc::new(input),
71 dyn_filters,
72 }
73 }
74
75 pub fn name() -> &'static str {
76 "RemoteDynFilterReceiver"
77 }
78
79 pub fn into_logical_plan(self) -> LogicalPlan {
80 LogicalPlan::Extension(Extension {
81 node: Arc::new(self),
82 })
83 }
84
85 fn dyn_filters(&self) -> &[Arc<dyn PhysicalExpr>] {
86 &self.dyn_filters
87 }
88
89 fn ord_key(&self) -> String {
90 format!("input={:?}, dyn_filters={:?}", self.input, self.dyn_filters)
91 }
92}
93
94impl fmt::Debug for RemoteDynFilterReceiverLogicalPlan {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 UserDefinedLogicalNodeCore::fmt_for_explain(self, f)
97 }
98}
99
100impl Hash for RemoteDynFilterReceiverLogicalPlan {
101 fn hash<H: Hasher>(&self, state: &mut H) {
102 self.input.hash(state);
103 self.dyn_filters.hash(state);
104 }
105}
106
107impl PartialEq for RemoteDynFilterReceiverLogicalPlan {
108 fn eq(&self, other: &Self) -> bool {
109 self.input == other.input && self.dyn_filters == other.dyn_filters
110 }
111}
112
113impl Eq for RemoteDynFilterReceiverLogicalPlan {}
114
115impl PartialOrd for RemoteDynFilterReceiverLogicalPlan {
116 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
117 Some(self.ord_key().cmp(&other.ord_key()))
118 }
119}
120
121impl UserDefinedLogicalNodeCore for RemoteDynFilterReceiverLogicalPlan {
122 fn name(&self) -> &str {
123 Self::name()
124 }
125
126 fn inputs(&self) -> Vec<&LogicalPlan> {
127 vec![self.input.as_ref()]
128 }
129
130 fn schema(&self) -> &DFSchemaRef {
131 self.input.schema()
132 }
133
134 fn expressions(&self) -> Vec<Expr> {
135 Vec::new()
136 }
137
138 fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
139 let mut required = output_columns.to_vec();
140
141 for filter in &self.dyn_filters {
142 required.extend(
143 collect_columns(filter)
144 .into_iter()
145 .map(|column| column.index()),
146 );
147 }
148
149 required.sort_unstable();
150 required.dedup();
151 Some(vec![required])
152 }
153
154 fn fmt_for_explain(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155 write!(f, "{}: filters={}", Self::name(), self.dyn_filters.len())
156 }
157
158 fn with_exprs_and_inputs(
159 &self,
160 _exprs: Vec<Expr>,
161 mut inputs: Vec<LogicalPlan>,
162 ) -> Result<Self> {
163 let input = inputs.pop().ok_or_else(|| {
164 DataFusionError::Internal(format!("Expected exactly one input with {}", Self::name()))
165 })?;
166 let dyn_filters = self
167 .dyn_filters
168 .iter()
169 .map(|filter| remap_physical_expr_columns(filter.clone(), input.schema().as_arrow()))
170 .collect::<Result<Vec<_>>>()?;
171 Ok(Self::new(input, dyn_filters))
172 }
173}
174
175fn remap_physical_expr_columns(
176 expr: Arc<dyn PhysicalExpr>,
177 input_schema: &datafusion::arrow::datatypes::Schema,
178) -> Result<Arc<dyn PhysicalExpr>> {
179 if let Some(column) = expr.as_any().downcast_ref::<Column>() {
180 return Ok(Arc::new(Column::new_with_schema(
181 column.name(),
182 input_schema,
183 )?));
184 }
185
186 let children = expr.children();
187 if children.is_empty() {
188 return Ok(expr);
189 }
190
191 let new_children = children
192 .into_iter()
193 .map(|child| remap_physical_expr_columns(child.clone(), input_schema))
194 .collect::<Result<Vec<_>>>()?;
195 expr.with_new_children(new_children)
196}
197
198pub struct RemoteDynFilterReceiverExtensionPlanner;
199
200#[async_trait]
201impl ExtensionPlanner for RemoteDynFilterReceiverExtensionPlanner {
202 async fn plan_extension(
203 &self,
204 _planner: &dyn PhysicalPlanner,
205 node: &dyn UserDefinedLogicalNode,
206 _logical_inputs: &[&LogicalPlan],
207 physical_inputs: &[Arc<dyn ExecutionPlan>],
208 _session_state: &SessionState,
209 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
210 let Some(receiver) = node
211 .as_any()
212 .downcast_ref::<RemoteDynFilterReceiverLogicalPlan>()
213 else {
214 return Ok(None);
215 };
216
217 let input = physical_inputs.first().cloned().ok_or_else(|| {
218 DataFusionError::Internal(format!("Expected exactly one input with {}", Self::name()))
219 })?;
220 if receiver.dyn_filters().is_empty() {
221 return Ok(Some(input));
222 }
223
224 let predicate = conjunction(receiver.dyn_filters().to_vec());
225 Ok(Some(Arc::new(FilterExec::try_new(predicate, input)?) as _))
226 }
227}
228
229impl RemoteDynFilterReceiverExtensionPlanner {
230 fn name() -> &'static str {
231 RemoteDynFilterReceiverLogicalPlan::name()
232 }
233}
234
235#[cfg(test)]
236mod tests {
237 use std::sync::Arc;
238
239 use datafusion::arrow::datatypes::{DataType, Field, Schema};
240 use datafusion::physical_plan::expressions::{Column, DynamicFilterPhysicalExpr, lit};
241 use datafusion_common::DFSchema;
242 use datafusion_expr::{EmptyRelation, UserDefinedLogicalNodeCore};
243
244 use super::*;
245
246 fn empty_input() -> LogicalPlan {
247 let schema = Arc::new(Schema::new(vec![
248 Field::new(
249 "ts",
250 DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
251 false,
252 ),
253 Field::new("value", DataType::Float64, true),
254 Field::new("instance", DataType::Utf8, true),
255 Field::new("job", DataType::Utf8, true),
256 ]));
257 LogicalPlan::EmptyRelation(EmptyRelation {
258 produce_one_row: false,
259 schema: Arc::new(DFSchema::try_from(schema).unwrap()),
260 })
261 }
262
263 fn pruned_input() -> LogicalPlan {
264 let schema = Arc::new(Schema::new(vec![
265 Field::new(
266 "ts",
267 DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
268 false,
269 ),
270 Field::new("instance", DataType::Utf8, true),
271 ]));
272 LogicalPlan::EmptyRelation(EmptyRelation {
273 produce_one_row: false,
274 schema: Arc::new(DFSchema::try_from(schema).unwrap()),
275 })
276 }
277
278 #[test]
279 fn necessary_children_exprs_keeps_parent_and_filter_columns() {
280 let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(
281 vec![Arc::new(Column::new("ts", 0)) as Arc<_>],
282 lit(true) as _,
283 ));
284 let plan = RemoteDynFilterReceiverLogicalPlan::new(empty_input(), vec![dyn_filter]);
285
286 let required = UserDefinedLogicalNodeCore::necessary_children_exprs(&plan, &[1]).unwrap();
289 assert_eq!(required, vec![vec![0, 1]]);
290 }
291
292 #[test]
293 fn necessary_children_exprs_is_transparent_without_filters() {
294 let plan = RemoteDynFilterReceiverLogicalPlan::new(empty_input(), vec![]);
295
296 let required =
297 UserDefinedLogicalNodeCore::necessary_children_exprs(&plan, &[1, 3]).unwrap();
298 assert_eq!(required, vec![vec![1, 3]]);
299 }
300
301 #[test]
302 fn with_exprs_and_inputs_remaps_dyn_filter_columns_to_pruned_input() {
303 let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(
304 vec![Arc::new(Column::new("instance", 2)) as Arc<_>],
305 lit(true) as _,
306 ));
307 let plan = RemoteDynFilterReceiverLogicalPlan::new(empty_input(), vec![dyn_filter]);
308
309 let remapped =
310 UserDefinedLogicalNodeCore::with_exprs_and_inputs(&plan, vec![], vec![pruned_input()])
311 .unwrap();
312
313 let columns = collect_columns(&remapped.dyn_filters()[0]);
314 assert_eq!(columns.len(), 1);
315 let column = columns.iter().next().unwrap();
316 assert_eq!(column.name(), "instance");
317 assert_eq!(column.index(), 1);
318 }
319}