1use ahash::{HashSet, HashSetExt};
16use snafu::OptionExt;
17use vrl::prelude::Value as VrlValue;
18
19use crate::error::{
20 Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
21 ValueMustBeMapSnafu,
22};
23use crate::etl::field::Fields;
24use crate::etl::processor::{
25 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, FIELDS_NAME, FIELD_NAME,
26};
27use crate::Processor;
28
29pub(crate) const PROCESSOR_FILTER: &str = "filter";
30
31const MATCH_MODE_NAME: &str = "mode";
32const MATCH_OP_NAME: &str = "match_op";
33const CASE_INSENSITIVE_NAME: &str = "case_insensitive";
34const TARGETS_NAME: &str = "targets";
35
36#[derive(Debug)]
37enum MatchMode {
38 SimpleMatch(MatchOp),
39}
40
41impl Default for MatchMode {
42 fn default() -> Self {
43 Self::SimpleMatch(MatchOp::default())
44 }
45}
46
47#[derive(Debug, Default)]
48enum MatchOp {
49 #[default]
50 In,
51 NotIn,
52}
53
54#[derive(Debug, Default)]
58pub struct FilterProcessor {
59 fields: Fields,
60 mode: MatchMode,
61 case_insensitive: bool,
62 targets: HashSet<String>,
63}
64
65impl TryFrom<&yaml_rust::yaml::Hash> for FilterProcessor {
66 type Error = Error;
67
68 #[allow(clippy::single_match)]
70 fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
71 let mut fields = Fields::default();
72 let mut mode = MatchMode::default();
73 let mut op = MatchOp::default();
74 let mut case_insensitive = true;
75 let mut targets = HashSet::new();
76
77 for (k, v) in value.iter() {
78 let key = k
79 .as_str()
80 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
81 match key {
82 FIELD_NAME => fields = Fields::one(yaml_new_field(v, FIELD_NAME)?),
83 FIELDS_NAME => fields = yaml_new_fields(v, FIELDS_NAME)?,
84 MATCH_MODE_NAME => match yaml_string(v, MATCH_MODE_NAME)?.as_str() {
85 "simple" => mode = MatchMode::SimpleMatch(MatchOp::In),
86 _ => {}
87 },
88 MATCH_OP_NAME => match yaml_string(v, MATCH_OP_NAME)?.as_str() {
89 "in" => op = MatchOp::In,
90 "not_in" => op = MatchOp::NotIn,
91 _ => {}
92 },
93 CASE_INSENSITIVE_NAME => case_insensitive = yaml_bool(v, CASE_INSENSITIVE_NAME)?,
94 TARGETS_NAME => {
95 yaml_strings(v, TARGETS_NAME)?
96 .into_iter()
97 .filter(|s| !s.is_empty())
98 .for_each(|s| {
99 targets.insert(s);
100 });
101 }
102 _ => {}
103 }
104 }
105
106 if matches!(mode, MatchMode::SimpleMatch(_)) {
107 mode = MatchMode::SimpleMatch(op);
108 }
109
110 if targets.is_empty() {
111 return ProcessorMissingFieldSnafu {
112 processor: PROCESSOR_FILTER,
113 field: TARGETS_NAME.to_string(),
114 }
115 .fail();
116 }
117
118 if case_insensitive {
119 targets = targets.into_iter().map(|s| s.to_lowercase()).collect();
120 }
121
122 Ok(FilterProcessor {
123 fields,
124 mode,
125 case_insensitive,
126 targets,
127 })
128 }
129}
130
131impl FilterProcessor {
132 fn match_target(&self, input: &str) -> bool {
133 let input = if self.case_insensitive {
134 &input.to_lowercase()
135 } else {
136 input
137 };
138
139 match &self.mode {
140 MatchMode::SimpleMatch(op) => match op {
141 MatchOp::In => self.targets.contains(input),
142 MatchOp::NotIn => !self.targets.contains(input),
143 },
144 }
145 }
146}
147
148impl Processor for FilterProcessor {
149 fn kind(&self) -> &str {
150 PROCESSOR_FILTER
151 }
152
153 fn ignore_missing(&self) -> bool {
154 true
155 }
156
157 fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
158 for field in self.fields.iter() {
159 let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
160 let index = field.input_field();
161 match val.get(index) {
162 Some(VrlValue::Bytes(b)) => {
163 if self.match_target(&String::from_utf8_lossy(b)) {
164 return Ok(VrlValue::Null);
165 }
166 }
167 Some(v) => {
168 return ProcessorExpectStringSnafu {
169 processor: self.kind(),
170 v: v.clone(),
171 }
172 .fail();
173 }
174 None => {}
175 }
176 }
177
178 Ok(val)
179 }
180}
181
182#[cfg(test)]
183mod test {
184 use ahash::HashSet;
185 use vrl::prelude::{Bytes, Value as VrlValue};
186 use vrl::value::{KeyString, ObjectMap};
187
188 use crate::etl::field::{Field, Fields};
189 use crate::etl::processor::filter::{FilterProcessor, MatchMode, MatchOp};
190 use crate::Processor;
191
192 #[test]
193 fn test_eq() {
194 let processor = FilterProcessor {
195 fields: Fields::one(Field::new("name", None)),
196 mode: MatchMode::SimpleMatch(MatchOp::In),
197 case_insensitive: false,
198 targets: HashSet::from_iter(vec!["John".to_string()]),
199 };
200
201 let val = VrlValue::Object(ObjectMap::from_iter(vec![(
202 KeyString::from("name"),
203 VrlValue::Bytes(Bytes::from("John")),
204 )]));
205
206 let result = processor.exec_mut(val).unwrap();
207 assert_eq!(result, VrlValue::Null);
208
209 let val = VrlValue::Object(ObjectMap::from_iter(vec![(
210 KeyString::from("name"),
211 VrlValue::Bytes(Bytes::from("Wick")),
212 )]));
213 let expect = val.clone();
214 let result = processor.exec_mut(val).unwrap();
215 assert_eq!(result, expect);
216 }
217
218 #[test]
219 fn test_ne() {
220 let processor = FilterProcessor {
221 fields: Fields::one(Field::new("name", None)),
222 mode: MatchMode::SimpleMatch(MatchOp::NotIn),
223 case_insensitive: false,
224 targets: HashSet::from_iter(vec!["John".to_string()]),
225 };
226
227 let val = VrlValue::Object(ObjectMap::from_iter(vec![(
228 KeyString::from("name"),
229 VrlValue::Bytes(Bytes::from("John")),
230 )]));
231 let expect = val.clone();
232 let result = processor.exec_mut(val).unwrap();
233 assert_eq!(result, expect);
234
235 let val = VrlValue::Object(ObjectMap::from_iter(vec![(
236 KeyString::from("name"),
237 VrlValue::Bytes(Bytes::from("Wick")),
238 )]));
239 let result = processor.exec_mut(val).unwrap();
240 assert_eq!(result, VrlValue::Null);
241 }
242
243 #[test]
244 fn test_case() {
245 let processor = FilterProcessor {
246 fields: Fields::one(Field::new("name", None)),
247 mode: MatchMode::SimpleMatch(MatchOp::In),
248 case_insensitive: true,
249 targets: HashSet::from_iter(vec!["john".to_string()]),
250 };
251
252 let val = VrlValue::Object(ObjectMap::from_iter(vec![(
253 KeyString::from("name"),
254 VrlValue::Bytes(Bytes::from("JoHN")),
255 )]));
256 let result = processor.exec_mut(val).unwrap();
257 assert_eq!(result, VrlValue::Null);
258 }
259}