1use ahash::{HashSet, HashSetExt};
16use snafu::OptionExt;
17use vrl::prelude::Value as VrlValue;
18
19use crate::Processor;
20use crate::error::{
21 Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
22 ValueMustBeMapSnafu,
23};
24use crate::etl::field::Fields;
25use crate::etl::processor::{
26 FIELD_NAME, FIELDS_NAME, yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings,
27};
28
29pub(crate) const PROCESSOR_FILTER: &str = "filter";
30
31const MATCH_MODE_NAME: &str = "mode";
32const MATCH_OP_NAME: &str = "match_op";
33const CASE_INSENSITIVE_NAME: &str = "case_insensitive";
34const TARGETS_NAME: &str = "targets";
35
36#[derive(Debug)]
37enum MatchMode {
38 SimpleMatch(MatchOp),
39}
40
41impl Default for MatchMode {
42 fn default() -> Self {
43 Self::SimpleMatch(MatchOp::default())
44 }
45}
46
47#[derive(Debug, Default)]
48enum MatchOp {
49 #[default]
50 In,
51 NotIn,
52}
53
54#[derive(Debug, Default)]
58pub struct FilterProcessor {
59 fields: Fields,
60 mode: MatchMode,
61 case_insensitive: bool,
62 targets: HashSet<String>,
63}
64
65impl TryFrom<&yaml_rust::yaml::Hash> for FilterProcessor {
66 type Error = Error;
67
68 #[allow(clippy::single_match)]
70 fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
71 let mut fields = Fields::default();
72 let mut mode = MatchMode::default();
73 let mut op = MatchOp::default();
74 let mut case_insensitive = true;
75 let mut targets = HashSet::new();
76
77 for (k, v) in value.iter() {
78 let key = k
79 .as_str()
80 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
81 match key {
82 FIELD_NAME => fields = Fields::one(yaml_new_field(v, FIELD_NAME)?),
83 FIELDS_NAME => fields = yaml_new_fields(v, FIELDS_NAME)?,
84 MATCH_MODE_NAME => match yaml_string(v, MATCH_MODE_NAME)?.as_str() {
85 "simple" => mode = MatchMode::SimpleMatch(MatchOp::In),
86 _ => {}
87 },
88 MATCH_OP_NAME => match yaml_string(v, MATCH_OP_NAME)?.as_str() {
89 "in" => op = MatchOp::In,
90 "not_in" => op = MatchOp::NotIn,
91 _ => {}
92 },
93 CASE_INSENSITIVE_NAME => case_insensitive = yaml_bool(v, CASE_INSENSITIVE_NAME)?,
94 TARGETS_NAME => {
95 yaml_strings(v, TARGETS_NAME)?
96 .into_iter()
97 .filter(|s| !s.is_empty())
98 .for_each(|s| {
99 targets.insert(s);
100 });
101 }
102 _ => {}
103 }
104 }
105
106 if matches!(mode, MatchMode::SimpleMatch(_)) {
107 mode = MatchMode::SimpleMatch(op);
108 }
109
110 if targets.is_empty() {
111 return ProcessorMissingFieldSnafu {
112 processor: PROCESSOR_FILTER,
113 field: TARGETS_NAME.to_string(),
114 }
115 .fail();
116 }
117
118 if case_insensitive {
119 targets = targets.into_iter().map(|s| s.to_lowercase()).collect();
120 }
121
122 Ok(FilterProcessor {
123 fields,
124 mode,
125 case_insensitive,
126 targets,
127 })
128 }
129}
130
131impl FilterProcessor {
132 fn match_target(&self, input: &str) -> bool {
133 let input = if self.case_insensitive {
134 &input.to_lowercase()
135 } else {
136 input
137 };
138
139 match &self.mode {
140 MatchMode::SimpleMatch(op) => match op {
141 MatchOp::In => self.targets.contains(input),
142 MatchOp::NotIn => !self.targets.contains(input),
143 },
144 }
145 }
146}
147
148impl Processor for FilterProcessor {
149 fn kind(&self) -> &str {
150 PROCESSOR_FILTER
151 }
152
153 fn ignore_missing(&self) -> bool {
154 true
155 }
156
157 fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
158 for field in self.fields.iter() {
159 let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
160 let index = field.input_field();
161 if let Some(v) = val.get(index) {
162 if let VrlValue::Bytes(b) = v {
163 if self.match_target(&String::from_utf8_lossy(b)) {
164 return Ok(VrlValue::Null);
165 }
166 } else {
167 return ProcessorExpectStringSnafu {
168 processor: self.kind(),
169 v: v.clone(),
170 }
171 .fail();
172 }
173 }
174 }
175
176 Ok(val)
177 }
178}
179
180#[cfg(test)]
181mod test {
182 use ahash::HashSet;
183 use vrl::prelude::{Bytes, Value as VrlValue};
184 use vrl::value::{KeyString, ObjectMap};
185
186 use crate::Processor;
187 use crate::etl::field::{Field, Fields};
188 use crate::etl::processor::filter::{FilterProcessor, MatchMode, MatchOp};
189
190 #[test]
191 fn test_eq() {
192 let processor = FilterProcessor {
193 fields: Fields::one(Field::new("name", None)),
194 mode: MatchMode::SimpleMatch(MatchOp::In),
195 case_insensitive: false,
196 targets: HashSet::from_iter(vec!["John".to_string()]),
197 };
198
199 let val = VrlValue::Object(ObjectMap::from_iter(vec![(
200 KeyString::from("name"),
201 VrlValue::Bytes(Bytes::from("John")),
202 )]));
203
204 let result = processor.exec_mut(val).unwrap();
205 assert_eq!(result, VrlValue::Null);
206
207 let val = VrlValue::Object(ObjectMap::from_iter(vec![(
208 KeyString::from("name"),
209 VrlValue::Bytes(Bytes::from("Wick")),
210 )]));
211 let expect = val.clone();
212 let result = processor.exec_mut(val).unwrap();
213 assert_eq!(result, expect);
214 }
215
216 #[test]
217 fn test_ne() {
218 let processor = FilterProcessor {
219 fields: Fields::one(Field::new("name", None)),
220 mode: MatchMode::SimpleMatch(MatchOp::NotIn),
221 case_insensitive: false,
222 targets: HashSet::from_iter(vec!["John".to_string()]),
223 };
224
225 let val = VrlValue::Object(ObjectMap::from_iter(vec![(
226 KeyString::from("name"),
227 VrlValue::Bytes(Bytes::from("John")),
228 )]));
229 let expect = val.clone();
230 let result = processor.exec_mut(val).unwrap();
231 assert_eq!(result, expect);
232
233 let val = VrlValue::Object(ObjectMap::from_iter(vec![(
234 KeyString::from("name"),
235 VrlValue::Bytes(Bytes::from("Wick")),
236 )]));
237 let result = processor.exec_mut(val).unwrap();
238 assert_eq!(result, VrlValue::Null);
239 }
240
241 #[test]
242 fn test_case() {
243 let processor = FilterProcessor {
244 fields: Fields::one(Field::new("name", None)),
245 mode: MatchMode::SimpleMatch(MatchOp::In),
246 case_insensitive: true,
247 targets: HashSet::from_iter(vec!["john".to_string()]),
248 };
249
250 let val = VrlValue::Object(ObjectMap::from_iter(vec![(
251 KeyString::from("name"),
252 VrlValue::Bytes(Bytes::from("JoHN")),
253 )]));
254 let result = processor.exec_mut(val).unwrap();
255 assert_eq!(result, VrlValue::Null);
256 }
257}