pipeline/etl/processor/
filter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use ahash::{HashSet, HashSetExt};
16use snafu::OptionExt;
17use vrl::prelude::Value as VrlValue;
18
19use crate::error::{
20    Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
21    ValueMustBeMapSnafu,
22};
23use crate::etl::field::Fields;
24use crate::etl::processor::{
25    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, FIELDS_NAME, FIELD_NAME,
26};
27use crate::Processor;
28
29pub(crate) const PROCESSOR_FILTER: &str = "filter";
30
31const MATCH_MODE_NAME: &str = "mode";
32const MATCH_OP_NAME: &str = "match_op";
33const CASE_INSENSITIVE_NAME: &str = "case_insensitive";
34const TARGETS_NAME: &str = "targets";
35
36#[derive(Debug)]
37enum MatchMode {
38    SimpleMatch(MatchOp),
39}
40
41impl Default for MatchMode {
42    fn default() -> Self {
43        Self::SimpleMatch(MatchOp::default())
44    }
45}
46
47#[derive(Debug, Default)]
48enum MatchOp {
49    #[default]
50    In,
51    NotIn,
52}
53
54/// Filter out the whole line if matches.
55/// Ultimately it's a condition check, maybe we can use VRL to do more complex check.
56/// Implement simple string match for now. Can be extended later.
57#[derive(Debug, Default)]
58pub struct FilterProcessor {
59    fields: Fields,
60    mode: MatchMode,
61    case_insensitive: bool,
62    targets: HashSet<String>,
63}
64
65impl TryFrom<&yaml_rust::yaml::Hash> for FilterProcessor {
66    type Error = Error;
67
68    // match mode can be extended in the future
69    #[allow(clippy::single_match)]
70    fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
71        let mut fields = Fields::default();
72        let mut mode = MatchMode::default();
73        let mut op = MatchOp::default();
74        let mut case_insensitive = true;
75        let mut targets = HashSet::new();
76
77        for (k, v) in value.iter() {
78            let key = k
79                .as_str()
80                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
81            match key {
82                FIELD_NAME => fields = Fields::one(yaml_new_field(v, FIELD_NAME)?),
83                FIELDS_NAME => fields = yaml_new_fields(v, FIELDS_NAME)?,
84                MATCH_MODE_NAME => match yaml_string(v, MATCH_MODE_NAME)?.as_str() {
85                    "simple" => mode = MatchMode::SimpleMatch(MatchOp::In),
86                    _ => {}
87                },
88                MATCH_OP_NAME => match yaml_string(v, MATCH_OP_NAME)?.as_str() {
89                    "in" => op = MatchOp::In,
90                    "not_in" => op = MatchOp::NotIn,
91                    _ => {}
92                },
93                CASE_INSENSITIVE_NAME => case_insensitive = yaml_bool(v, CASE_INSENSITIVE_NAME)?,
94                TARGETS_NAME => {
95                    yaml_strings(v, TARGETS_NAME)?
96                        .into_iter()
97                        .filter(|s| !s.is_empty())
98                        .for_each(|s| {
99                            targets.insert(s);
100                        });
101                }
102                _ => {}
103            }
104        }
105
106        if matches!(mode, MatchMode::SimpleMatch(_)) {
107            mode = MatchMode::SimpleMatch(op);
108        }
109
110        if targets.is_empty() {
111            return ProcessorMissingFieldSnafu {
112                processor: PROCESSOR_FILTER,
113                field: TARGETS_NAME.to_string(),
114            }
115            .fail();
116        }
117
118        if case_insensitive {
119            targets = targets.into_iter().map(|s| s.to_lowercase()).collect();
120        }
121
122        Ok(FilterProcessor {
123            fields,
124            mode,
125            case_insensitive,
126            targets,
127        })
128    }
129}
130
131impl FilterProcessor {
132    fn match_target(&self, input: &str) -> bool {
133        let input = if self.case_insensitive {
134            &input.to_lowercase()
135        } else {
136            input
137        };
138
139        match &self.mode {
140            MatchMode::SimpleMatch(op) => match op {
141                MatchOp::In => self.targets.contains(input),
142                MatchOp::NotIn => !self.targets.contains(input),
143            },
144        }
145    }
146}
147
148impl Processor for FilterProcessor {
149    fn kind(&self) -> &str {
150        PROCESSOR_FILTER
151    }
152
153    fn ignore_missing(&self) -> bool {
154        true
155    }
156
157    fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
158        for field in self.fields.iter() {
159            let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
160            let index = field.input_field();
161            match val.get(index) {
162                Some(VrlValue::Bytes(b)) => {
163                    if self.match_target(&String::from_utf8_lossy(b)) {
164                        return Ok(VrlValue::Null);
165                    }
166                }
167                Some(v) => {
168                    return ProcessorExpectStringSnafu {
169                        processor: self.kind(),
170                        v: v.clone(),
171                    }
172                    .fail();
173                }
174                None => {}
175            }
176        }
177
178        Ok(val)
179    }
180}
181
182#[cfg(test)]
183mod test {
184    use ahash::HashSet;
185    use vrl::prelude::{Bytes, Value as VrlValue};
186    use vrl::value::{KeyString, ObjectMap};
187
188    use crate::etl::field::{Field, Fields};
189    use crate::etl::processor::filter::{FilterProcessor, MatchMode, MatchOp};
190    use crate::Processor;
191
192    #[test]
193    fn test_eq() {
194        let processor = FilterProcessor {
195            fields: Fields::one(Field::new("name", None)),
196            mode: MatchMode::SimpleMatch(MatchOp::In),
197            case_insensitive: false,
198            targets: HashSet::from_iter(vec!["John".to_string()]),
199        };
200
201        let val = VrlValue::Object(ObjectMap::from_iter(vec![(
202            KeyString::from("name"),
203            VrlValue::Bytes(Bytes::from("John")),
204        )]));
205
206        let result = processor.exec_mut(val).unwrap();
207        assert_eq!(result, VrlValue::Null);
208
209        let val = VrlValue::Object(ObjectMap::from_iter(vec![(
210            KeyString::from("name"),
211            VrlValue::Bytes(Bytes::from("Wick")),
212        )]));
213        let expect = val.clone();
214        let result = processor.exec_mut(val).unwrap();
215        assert_eq!(result, expect);
216    }
217
218    #[test]
219    fn test_ne() {
220        let processor = FilterProcessor {
221            fields: Fields::one(Field::new("name", None)),
222            mode: MatchMode::SimpleMatch(MatchOp::NotIn),
223            case_insensitive: false,
224            targets: HashSet::from_iter(vec!["John".to_string()]),
225        };
226
227        let val = VrlValue::Object(ObjectMap::from_iter(vec![(
228            KeyString::from("name"),
229            VrlValue::Bytes(Bytes::from("John")),
230        )]));
231        let expect = val.clone();
232        let result = processor.exec_mut(val).unwrap();
233        assert_eq!(result, expect);
234
235        let val = VrlValue::Object(ObjectMap::from_iter(vec![(
236            KeyString::from("name"),
237            VrlValue::Bytes(Bytes::from("Wick")),
238        )]));
239        let result = processor.exec_mut(val).unwrap();
240        assert_eq!(result, VrlValue::Null);
241    }
242
243    #[test]
244    fn test_case() {
245        let processor = FilterProcessor {
246            fields: Fields::one(Field::new("name", None)),
247            mode: MatchMode::SimpleMatch(MatchOp::In),
248            case_insensitive: true,
249            targets: HashSet::from_iter(vec!["john".to_string()]),
250        };
251
252        let val = VrlValue::Object(ObjectMap::from_iter(vec![(
253            KeyString::from("name"),
254            VrlValue::Bytes(Bytes::from("JoHN")),
255        )]));
256        let result = processor.exec_mut(val).unwrap();
257        assert_eq!(result, VrlValue::Null);
258    }
259}