Skip to main content

pipeline/etl/processor/
filter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use ahash::{HashSet, HashSetExt};
16use snafu::OptionExt;
17use vrl::prelude::Value as VrlValue;
18
19use crate::Processor;
20use crate::error::{
21    Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
22    ValueMustBeMapSnafu,
23};
24use crate::etl::field::Fields;
25use crate::etl::processor::{
26    FIELD_NAME, FIELDS_NAME, yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings,
27};
28
29pub(crate) const PROCESSOR_FILTER: &str = "filter";
30
31const MATCH_MODE_NAME: &str = "mode";
32const MATCH_OP_NAME: &str = "match_op";
33const CASE_INSENSITIVE_NAME: &str = "case_insensitive";
34const TARGETS_NAME: &str = "targets";
35
36#[derive(Debug)]
37enum MatchMode {
38    SimpleMatch(MatchOp),
39}
40
41impl Default for MatchMode {
42    fn default() -> Self {
43        Self::SimpleMatch(MatchOp::default())
44    }
45}
46
47#[derive(Debug, Default)]
48enum MatchOp {
49    #[default]
50    In,
51    NotIn,
52}
53
54/// Filter out the whole line if matches.
55/// Ultimately it's a condition check, maybe we can use VRL to do more complex check.
56/// Implement simple string match for now. Can be extended later.
57#[derive(Debug, Default)]
58pub struct FilterProcessor {
59    fields: Fields,
60    mode: MatchMode,
61    case_insensitive: bool,
62    targets: HashSet<String>,
63}
64
65impl TryFrom<&yaml_rust::yaml::Hash> for FilterProcessor {
66    type Error = Error;
67
68    // match mode can be extended in the future
69    #[allow(clippy::single_match)]
70    fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
71        let mut fields = Fields::default();
72        let mut mode = MatchMode::default();
73        let mut op = MatchOp::default();
74        let mut case_insensitive = true;
75        let mut targets = HashSet::new();
76
77        for (k, v) in value.iter() {
78            let key = k
79                .as_str()
80                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
81            match key {
82                FIELD_NAME => fields = Fields::one(yaml_new_field(v, FIELD_NAME)?),
83                FIELDS_NAME => fields = yaml_new_fields(v, FIELDS_NAME)?,
84                MATCH_MODE_NAME => match yaml_string(v, MATCH_MODE_NAME)?.as_str() {
85                    "simple" => mode = MatchMode::SimpleMatch(MatchOp::In),
86                    _ => {}
87                },
88                MATCH_OP_NAME => match yaml_string(v, MATCH_OP_NAME)?.as_str() {
89                    "in" => op = MatchOp::In,
90                    "not_in" => op = MatchOp::NotIn,
91                    _ => {}
92                },
93                CASE_INSENSITIVE_NAME => case_insensitive = yaml_bool(v, CASE_INSENSITIVE_NAME)?,
94                TARGETS_NAME => {
95                    yaml_strings(v, TARGETS_NAME)?
96                        .into_iter()
97                        .filter(|s| !s.is_empty())
98                        .for_each(|s| {
99                            targets.insert(s);
100                        });
101                }
102                _ => {}
103            }
104        }
105
106        if matches!(mode, MatchMode::SimpleMatch(_)) {
107            mode = MatchMode::SimpleMatch(op);
108        }
109
110        if targets.is_empty() {
111            return ProcessorMissingFieldSnafu {
112                processor: PROCESSOR_FILTER,
113                field: TARGETS_NAME.to_string(),
114            }
115            .fail();
116        }
117
118        if case_insensitive {
119            targets = targets.into_iter().map(|s| s.to_lowercase()).collect();
120        }
121
122        Ok(FilterProcessor {
123            fields,
124            mode,
125            case_insensitive,
126            targets,
127        })
128    }
129}
130
131impl FilterProcessor {
132    fn match_target(&self, input: &str) -> bool {
133        let input = if self.case_insensitive {
134            &input.to_lowercase()
135        } else {
136            input
137        };
138
139        match &self.mode {
140            MatchMode::SimpleMatch(op) => match op {
141                MatchOp::In => self.targets.contains(input),
142                MatchOp::NotIn => !self.targets.contains(input),
143            },
144        }
145    }
146}
147
148impl Processor for FilterProcessor {
149    fn kind(&self) -> &str {
150        PROCESSOR_FILTER
151    }
152
153    fn ignore_missing(&self) -> bool {
154        true
155    }
156
157    fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
158        for field in self.fields.iter() {
159            let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
160            let index = field.input_field();
161            if let Some(v) = val.get(index) {
162                if let VrlValue::Bytes(b) = v {
163                    if self.match_target(&String::from_utf8_lossy(b)) {
164                        return Ok(VrlValue::Null);
165                    }
166                } else {
167                    return ProcessorExpectStringSnafu {
168                        processor: self.kind(),
169                        v: v.clone(),
170                    }
171                    .fail();
172                }
173            }
174        }
175
176        Ok(val)
177    }
178}
179
180#[cfg(test)]
181mod test {
182    use ahash::HashSet;
183    use vrl::prelude::{Bytes, Value as VrlValue};
184    use vrl::value::{KeyString, ObjectMap};
185
186    use crate::Processor;
187    use crate::etl::field::{Field, Fields};
188    use crate::etl::processor::filter::{FilterProcessor, MatchMode, MatchOp};
189
190    #[test]
191    fn test_eq() {
192        let processor = FilterProcessor {
193            fields: Fields::one(Field::new("name", None)),
194            mode: MatchMode::SimpleMatch(MatchOp::In),
195            case_insensitive: false,
196            targets: HashSet::from_iter(vec!["John".to_string()]),
197        };
198
199        let val = VrlValue::Object(ObjectMap::from_iter(vec![(
200            KeyString::from("name"),
201            VrlValue::Bytes(Bytes::from("John")),
202        )]));
203
204        let result = processor.exec_mut(val).unwrap();
205        assert_eq!(result, VrlValue::Null);
206
207        let val = VrlValue::Object(ObjectMap::from_iter(vec![(
208            KeyString::from("name"),
209            VrlValue::Bytes(Bytes::from("Wick")),
210        )]));
211        let expect = val.clone();
212        let result = processor.exec_mut(val).unwrap();
213        assert_eq!(result, expect);
214    }
215
216    #[test]
217    fn test_ne() {
218        let processor = FilterProcessor {
219            fields: Fields::one(Field::new("name", None)),
220            mode: MatchMode::SimpleMatch(MatchOp::NotIn),
221            case_insensitive: false,
222            targets: HashSet::from_iter(vec!["John".to_string()]),
223        };
224
225        let val = VrlValue::Object(ObjectMap::from_iter(vec![(
226            KeyString::from("name"),
227            VrlValue::Bytes(Bytes::from("John")),
228        )]));
229        let expect = val.clone();
230        let result = processor.exec_mut(val).unwrap();
231        assert_eq!(result, expect);
232
233        let val = VrlValue::Object(ObjectMap::from_iter(vec![(
234            KeyString::from("name"),
235            VrlValue::Bytes(Bytes::from("Wick")),
236        )]));
237        let result = processor.exec_mut(val).unwrap();
238        assert_eq!(result, VrlValue::Null);
239    }
240
241    #[test]
242    fn test_case() {
243        let processor = FilterProcessor {
244            fields: Fields::one(Field::new("name", None)),
245            mode: MatchMode::SimpleMatch(MatchOp::In),
246            case_insensitive: true,
247            targets: HashSet::from_iter(vec!["john".to_string()]),
248        };
249
250        let val = VrlValue::Object(ObjectMap::from_iter(vec![(
251            KeyString::from("name"),
252            VrlValue::Bytes(Bytes::from("JoHN")),
253        )]));
254        let result = processor.exec_mut(val).unwrap();
255        assert_eq!(result, VrlValue::Null);
256    }
257}