Skip to main content

query/dist_plan/
filter_id.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Display, Formatter};
16use std::hash::{DefaultHasher, Hasher};
17use std::num::ParseIntError;
18use std::str::FromStr;
19use std::sync::Arc;
20
21use datafusion_common::{DataFusionError, Result};
22use datafusion_physical_expr::PhysicalExpr;
23use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
24use datafusion_proto::physical_plan::to_proto::serialize_physical_expr;
25use prost::Message;
26
27#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
28pub struct FilterFingerprint(u64);
29
30impl FilterFingerprint {
31    pub fn new(value: u64) -> Self {
32        Self(value)
33    }
34
35    pub fn get(self) -> u64 {
36        self.0
37    }
38}
39
40impl Display for FilterFingerprint {
41    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
42        write!(f, "{:016x}", self.0)
43    }
44}
45
46impl FromStr for FilterFingerprint {
47    type Err = ParseIntError;
48
49    fn from_str(s: &str) -> Result<Self, Self::Err> {
50        Ok(Self(u64::from_str_radix(s, 16)?))
51    }
52}
53
54/// Query-local identity for one remote dynamic filter producer.
55///
56/// Distinguishes independent producers(MergeScan node) that may share ordinals and child fingerprints.
57#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
58pub struct RemoteDynFilterProducerId(u64);
59
60impl RemoteDynFilterProducerId {
61    pub fn new(value: u64) -> Self {
62        Self(value)
63    }
64
65    pub fn get(self) -> u64 {
66        self.0
67    }
68}
69
70impl Display for RemoteDynFilterProducerId {
71    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72        write!(f, "{:016x}", self.0)
73    }
74}
75
76impl FromStr for RemoteDynFilterProducerId {
77    type Err = ParseIntError;
78
79    fn from_str(s: &str) -> Result<Self, Self::Err> {
80        Ok(Self(u64::from_str_radix(s, 16)?))
81    }
82}
83
84#[derive(Clone, Debug, PartialEq, Eq, Hash)]
85pub struct FilterId {
86    remote_dyn_filter_producer_id: RemoteDynFilterProducerId,
87    producer_ordinal: u32,
88    children_fingerprint: FilterFingerprint,
89}
90
91// NOTE(remote-dyn-filter): FilterId is source-generated and propagated; consumers should not
92// recompute it from local scan state.
93
94impl FilterId {
95    pub fn new(
96        remote_dyn_filter_producer_id: RemoteDynFilterProducerId,
97        producer_ordinal: u32,
98        children_fingerprint: FilterFingerprint,
99    ) -> Self {
100        Self {
101            remote_dyn_filter_producer_id,
102            producer_ordinal,
103            children_fingerprint,
104        }
105    }
106
107    pub fn producer_ordinal(&self) -> u32 {
108        self.producer_ordinal
109    }
110
111    pub fn remote_dyn_filter_producer_id(&self) -> RemoteDynFilterProducerId {
112        self.remote_dyn_filter_producer_id
113    }
114
115    pub fn children_fingerprint(&self) -> FilterFingerprint {
116        self.children_fingerprint
117    }
118}
119
120impl Display for FilterId {
121    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
122        write!(
123            f,
124            "{}:{}:{}",
125            self.remote_dyn_filter_producer_id, self.producer_ordinal, self.children_fingerprint
126        )
127    }
128}
129
130impl FromStr for FilterId {
131    type Err = ParseFilterIdError;
132
133    fn from_str(s: &str) -> Result<Self, Self::Err> {
134        let mut parts = s.split(':');
135        let remote_dyn_filter_producer_id = parts
136            .next()
137            .ok_or(ParseFilterIdError)?
138            .parse::<RemoteDynFilterProducerId>()
139            .map_err(|_| ParseFilterIdError)?;
140        let producer_local_ordinal = parts
141            .next()
142            .ok_or(ParseFilterIdError)?
143            .parse::<u32>()
144            .map_err(|_| ParseFilterIdError)?;
145        let children_fingerprint = parts
146            .next()
147            .ok_or(ParseFilterIdError)?
148            .parse::<FilterFingerprint>()
149            .map_err(|_| ParseFilterIdError)?;
150        if parts.next().is_some() {
151            return Err(ParseFilterIdError);
152        }
153
154        Ok(Self::new(
155            remote_dyn_filter_producer_id,
156            producer_local_ordinal,
157            children_fingerprint,
158        ))
159    }
160}
161
162#[derive(Debug)]
163pub struct ParseFilterIdError;
164
165impl Display for ParseFilterIdError {
166    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
167        write!(f, "invalid filter id")
168    }
169}
170
171/// Builds the query-local remote dynamic filter identity.
172///
173/// Identity is producer id + local ordinal + child fingerprint; routing stays outside.
174pub(crate) fn build_remote_dyn_filter_id(
175    remote_dyn_filter_producer_id: RemoteDynFilterProducerId,
176    producer_local_ordinal: usize,
177    children: &[Arc<dyn PhysicalExpr>],
178) -> Result<FilterId> {
179    let children_fingerprint = canonicalize_dyn_filter_children(children)?;
180    let producer_local_ordinal = u32::try_from(producer_local_ordinal).map_err(|err| {
181        let _ = err;
182        DataFusionError::Execution("producer ordinal out of range for filter id".to_string())
183    })?;
184    Ok(FilterId::new(
185        remote_dyn_filter_producer_id,
186        producer_local_ordinal,
187        children_fingerprint,
188    ))
189}
190
191fn canonicalize_dyn_filter_children(
192    children: &[Arc<dyn PhysicalExpr>],
193) -> Result<FilterFingerprint> {
194    let codec = DefaultPhysicalExtensionCodec {};
195    let mut hasher = DefaultHasher::new();
196    hasher.write_usize(children.len());
197
198    for child in children {
199        let proto = serialize_physical_expr(child, &codec)?;
200        let mut bytes = Vec::new();
201        proto
202            .encode(&mut bytes)
203            .map_err(|e| DataFusionError::External(Box::new(e)))?;
204        hasher.write_usize(bytes.len());
205        hasher.write(&bytes);
206    }
207
208    Ok(FilterFingerprint::new(hasher.finish()))
209}
210
211#[cfg(test)]
212mod tests {
213    use datafusion_physical_expr::expressions::Column;
214
215    use super::*;
216
217    fn test_children(names: &[&str]) -> Vec<Arc<dyn PhysicalExpr>> {
218        names
219            .iter()
220            .enumerate()
221            .map(|(index, name)| Arc::new(Column::new(name, index)) as Arc<dyn PhysicalExpr>)
222            .collect()
223    }
224
225    #[test]
226    fn filter_id_round_trips_through_string() {
227        let filter_id = FilterId::new(
228            RemoteDynFilterProducerId::new(42),
229            3,
230            FilterFingerprint::new(0xabc),
231        );
232        let encoded = filter_id.to_string();
233
234        assert_eq!(encoded, "000000000000002a:3:0000000000000abc");
235        assert_eq!(encoded.parse::<FilterId>().unwrap(), filter_id);
236    }
237
238    #[test]
239    fn filter_id_rejects_malformed_strings() {
240        assert!("".parse::<FilterId>().is_err());
241        assert!("0000000000000001:3".parse::<FilterId>().is_err());
242        assert!("0000000000000001:3:zzzz".parse::<FilterId>().is_err());
243        assert!(
244            "0000000000000001:3:0000000000000abc:extra"
245                .parse::<FilterId>()
246                .is_err()
247        );
248    }
249
250    #[test]
251    fn remote_dyn_filter_id_is_stable_for_equivalent_children() {
252        let remote_dyn_filter_producer_id = RemoteDynFilterProducerId::new(42);
253        let first = build_remote_dyn_filter_id(
254            remote_dyn_filter_producer_id,
255            3,
256            &test_children(&["host", "pod"]),
257        )
258        .unwrap();
259        let second = build_remote_dyn_filter_id(
260            remote_dyn_filter_producer_id,
261            3,
262            &test_children(&["host", "pod"]),
263        )
264        .unwrap();
265
266        assert_eq!(first, second);
267    }
268
269    #[test]
270    fn remote_dyn_filter_id_changes_when_remote_dyn_filter_producer_changes() {
271        let children = test_children(&["host", "pod"]);
272        let baseline =
273            build_remote_dyn_filter_id(RemoteDynFilterProducerId::new(42), 3, &children).unwrap();
274        let different_producer_id =
275            build_remote_dyn_filter_id(RemoteDynFilterProducerId::new(43), 3, &children).unwrap();
276
277        assert_ne!(baseline, different_producer_id);
278    }
279
280    #[test]
281    fn remote_dyn_filter_id_changes_when_identity_inputs_change() {
282        let children = test_children(&["host", "pod"]);
283        let remote_dyn_filter_producer_id = RemoteDynFilterProducerId::new(42);
284        let baseline =
285            build_remote_dyn_filter_id(remote_dyn_filter_producer_id, 3, &children).unwrap();
286        let different_ordinal =
287            build_remote_dyn_filter_id(remote_dyn_filter_producer_id, 4, &children).unwrap();
288        let different_children = build_remote_dyn_filter_id(
289            remote_dyn_filter_producer_id,
290            3,
291            &test_children(&["pod", "host"]),
292        )
293        .unwrap();
294
295        assert_ne!(baseline, different_ordinal);
296        assert_ne!(baseline, different_children);
297    }
298
299    #[test]
300    fn remote_dyn_filter_id_supports_empty_children() {
301        let remote_dyn_filter_producer_id = RemoteDynFilterProducerId::new(42);
302        let first = build_remote_dyn_filter_id(remote_dyn_filter_producer_id, 1, &[]).unwrap();
303        let second = build_remote_dyn_filter_id(remote_dyn_filter_producer_id, 1, &[]).unwrap();
304
305        assert_eq!(first, second);
306        assert_eq!(
307            first.remote_dyn_filter_producer_id(),
308            remote_dyn_filter_producer_id
309        );
310        assert_eq!(first.producer_ordinal(), 1);
311        assert_eq!(first.children_fingerprint(), second.children_fingerprint());
312    }
313
314    #[test]
315    fn remote_dyn_filter_id_rejects_out_of_range_producer_ordinal() {
316        let error = build_remote_dyn_filter_id(RemoteDynFilterProducerId::new(42), usize::MAX, &[])
317            .unwrap_err()
318            .to_string();
319
320        assert!(error.contains("producer ordinal out of range for filter id"));
321    }
322}