1use std::fmt::{Display, Formatter};
16use std::hash::{DefaultHasher, Hasher};
17use std::num::ParseIntError;
18use std::str::FromStr;
19use std::sync::Arc;
20
21use datafusion_common::{DataFusionError, Result};
22use datafusion_physical_expr::PhysicalExpr;
23use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
24use datafusion_proto::physical_plan::to_proto::serialize_physical_expr;
25use prost::Message;
26
27#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
28pub struct FilterFingerprint(u64);
29
30impl FilterFingerprint {
31 pub fn new(value: u64) -> Self {
32 Self(value)
33 }
34
35 pub fn get(self) -> u64 {
36 self.0
37 }
38}
39
40impl Display for FilterFingerprint {
41 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
42 write!(f, "{:016x}", self.0)
43 }
44}
45
46impl FromStr for FilterFingerprint {
47 type Err = ParseIntError;
48
49 fn from_str(s: &str) -> Result<Self, Self::Err> {
50 Ok(Self(u64::from_str_radix(s, 16)?))
51 }
52}
53
54#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
58pub struct RemoteDynFilterProducerId(u64);
59
60impl RemoteDynFilterProducerId {
61 pub fn new(value: u64) -> Self {
62 Self(value)
63 }
64
65 pub fn get(self) -> u64 {
66 self.0
67 }
68}
69
70impl Display for RemoteDynFilterProducerId {
71 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72 write!(f, "{:016x}", self.0)
73 }
74}
75
76impl FromStr for RemoteDynFilterProducerId {
77 type Err = ParseIntError;
78
79 fn from_str(s: &str) -> Result<Self, Self::Err> {
80 Ok(Self(u64::from_str_radix(s, 16)?))
81 }
82}
83
84#[derive(Clone, Debug, PartialEq, Eq, Hash)]
85pub struct FilterId {
86 remote_dyn_filter_producer_id: RemoteDynFilterProducerId,
87 producer_ordinal: u32,
88 children_fingerprint: FilterFingerprint,
89}
90
91impl FilterId {
95 pub fn new(
96 remote_dyn_filter_producer_id: RemoteDynFilterProducerId,
97 producer_ordinal: u32,
98 children_fingerprint: FilterFingerprint,
99 ) -> Self {
100 Self {
101 remote_dyn_filter_producer_id,
102 producer_ordinal,
103 children_fingerprint,
104 }
105 }
106
107 pub fn producer_ordinal(&self) -> u32 {
108 self.producer_ordinal
109 }
110
111 pub fn remote_dyn_filter_producer_id(&self) -> RemoteDynFilterProducerId {
112 self.remote_dyn_filter_producer_id
113 }
114
115 pub fn children_fingerprint(&self) -> FilterFingerprint {
116 self.children_fingerprint
117 }
118}
119
120impl Display for FilterId {
121 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
122 write!(
123 f,
124 "{}:{}:{}",
125 self.remote_dyn_filter_producer_id, self.producer_ordinal, self.children_fingerprint
126 )
127 }
128}
129
130impl FromStr for FilterId {
131 type Err = ParseFilterIdError;
132
133 fn from_str(s: &str) -> Result<Self, Self::Err> {
134 let mut parts = s.split(':');
135 let remote_dyn_filter_producer_id = parts
136 .next()
137 .ok_or(ParseFilterIdError)?
138 .parse::<RemoteDynFilterProducerId>()
139 .map_err(|_| ParseFilterIdError)?;
140 let producer_local_ordinal = parts
141 .next()
142 .ok_or(ParseFilterIdError)?
143 .parse::<u32>()
144 .map_err(|_| ParseFilterIdError)?;
145 let children_fingerprint = parts
146 .next()
147 .ok_or(ParseFilterIdError)?
148 .parse::<FilterFingerprint>()
149 .map_err(|_| ParseFilterIdError)?;
150 if parts.next().is_some() {
151 return Err(ParseFilterIdError);
152 }
153
154 Ok(Self::new(
155 remote_dyn_filter_producer_id,
156 producer_local_ordinal,
157 children_fingerprint,
158 ))
159 }
160}
161
162#[derive(Debug)]
163pub struct ParseFilterIdError;
164
165impl Display for ParseFilterIdError {
166 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
167 write!(f, "invalid filter id")
168 }
169}
170
171pub(crate) fn build_remote_dyn_filter_id(
175 remote_dyn_filter_producer_id: RemoteDynFilterProducerId,
176 producer_local_ordinal: usize,
177 children: &[Arc<dyn PhysicalExpr>],
178) -> Result<FilterId> {
179 let children_fingerprint = canonicalize_dyn_filter_children(children)?;
180 let producer_local_ordinal = u32::try_from(producer_local_ordinal).map_err(|err| {
181 let _ = err;
182 DataFusionError::Execution("producer ordinal out of range for filter id".to_string())
183 })?;
184 Ok(FilterId::new(
185 remote_dyn_filter_producer_id,
186 producer_local_ordinal,
187 children_fingerprint,
188 ))
189}
190
191fn canonicalize_dyn_filter_children(
192 children: &[Arc<dyn PhysicalExpr>],
193) -> Result<FilterFingerprint> {
194 let codec = DefaultPhysicalExtensionCodec {};
195 let mut hasher = DefaultHasher::new();
196 hasher.write_usize(children.len());
197
198 for child in children {
199 let proto = serialize_physical_expr(child, &codec)?;
200 let mut bytes = Vec::new();
201 proto
202 .encode(&mut bytes)
203 .map_err(|e| DataFusionError::External(Box::new(e)))?;
204 hasher.write_usize(bytes.len());
205 hasher.write(&bytes);
206 }
207
208 Ok(FilterFingerprint::new(hasher.finish()))
209}
210
211#[cfg(test)]
212mod tests {
213 use datafusion_physical_expr::expressions::Column;
214
215 use super::*;
216
217 fn test_children(names: &[&str]) -> Vec<Arc<dyn PhysicalExpr>> {
218 names
219 .iter()
220 .enumerate()
221 .map(|(index, name)| Arc::new(Column::new(name, index)) as Arc<dyn PhysicalExpr>)
222 .collect()
223 }
224
225 #[test]
226 fn filter_id_round_trips_through_string() {
227 let filter_id = FilterId::new(
228 RemoteDynFilterProducerId::new(42),
229 3,
230 FilterFingerprint::new(0xabc),
231 );
232 let encoded = filter_id.to_string();
233
234 assert_eq!(encoded, "000000000000002a:3:0000000000000abc");
235 assert_eq!(encoded.parse::<FilterId>().unwrap(), filter_id);
236 }
237
238 #[test]
239 fn filter_id_rejects_malformed_strings() {
240 assert!("".parse::<FilterId>().is_err());
241 assert!("0000000000000001:3".parse::<FilterId>().is_err());
242 assert!("0000000000000001:3:zzzz".parse::<FilterId>().is_err());
243 assert!(
244 "0000000000000001:3:0000000000000abc:extra"
245 .parse::<FilterId>()
246 .is_err()
247 );
248 }
249
250 #[test]
251 fn remote_dyn_filter_id_is_stable_for_equivalent_children() {
252 let remote_dyn_filter_producer_id = RemoteDynFilterProducerId::new(42);
253 let first = build_remote_dyn_filter_id(
254 remote_dyn_filter_producer_id,
255 3,
256 &test_children(&["host", "pod"]),
257 )
258 .unwrap();
259 let second = build_remote_dyn_filter_id(
260 remote_dyn_filter_producer_id,
261 3,
262 &test_children(&["host", "pod"]),
263 )
264 .unwrap();
265
266 assert_eq!(first, second);
267 }
268
269 #[test]
270 fn remote_dyn_filter_id_changes_when_remote_dyn_filter_producer_changes() {
271 let children = test_children(&["host", "pod"]);
272 let baseline =
273 build_remote_dyn_filter_id(RemoteDynFilterProducerId::new(42), 3, &children).unwrap();
274 let different_producer_id =
275 build_remote_dyn_filter_id(RemoteDynFilterProducerId::new(43), 3, &children).unwrap();
276
277 assert_ne!(baseline, different_producer_id);
278 }
279
280 #[test]
281 fn remote_dyn_filter_id_changes_when_identity_inputs_change() {
282 let children = test_children(&["host", "pod"]);
283 let remote_dyn_filter_producer_id = RemoteDynFilterProducerId::new(42);
284 let baseline =
285 build_remote_dyn_filter_id(remote_dyn_filter_producer_id, 3, &children).unwrap();
286 let different_ordinal =
287 build_remote_dyn_filter_id(remote_dyn_filter_producer_id, 4, &children).unwrap();
288 let different_children = build_remote_dyn_filter_id(
289 remote_dyn_filter_producer_id,
290 3,
291 &test_children(&["pod", "host"]),
292 )
293 .unwrap();
294
295 assert_ne!(baseline, different_ordinal);
296 assert_ne!(baseline, different_children);
297 }
298
299 #[test]
300 fn remote_dyn_filter_id_supports_empty_children() {
301 let remote_dyn_filter_producer_id = RemoteDynFilterProducerId::new(42);
302 let first = build_remote_dyn_filter_id(remote_dyn_filter_producer_id, 1, &[]).unwrap();
303 let second = build_remote_dyn_filter_id(remote_dyn_filter_producer_id, 1, &[]).unwrap();
304
305 assert_eq!(first, second);
306 assert_eq!(
307 first.remote_dyn_filter_producer_id(),
308 remote_dyn_filter_producer_id
309 );
310 assert_eq!(first.producer_ordinal(), 1);
311 assert_eq!(first.children_fingerprint(), second.children_fingerprint());
312 }
313
314 #[test]
315 fn remote_dyn_filter_id_rejects_out_of_range_producer_ordinal() {
316 let error = build_remote_dyn_filter_id(RemoteDynFilterProducerId::new(42), usize::MAX, &[])
317 .unwrap_err()
318 .to_string();
319
320 assert!(error.contains("producer ordinal out of range for filter id"));
321 }
322}