1const PATTERNS_NAME: &str = "patterns";
18
19pub(crate) const PROCESSOR_REGEX: &str = "regex";
20
21use std::collections::BTreeMap;
22
23use lazy_static::lazy_static;
24use regex::Regex;
25use snafu::{OptionExt, ResultExt};
26use vrl::prelude::Bytes;
27use vrl::value::{KeyString, Value as VrlValue};
28
29use crate::error::{
30 Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu,
31 RegexNamedGroupNotFoundSnafu, RegexNoValidFieldSnafu, RegexNoValidPatternSnafu, RegexSnafu,
32 Result, ValueMustBeMapSnafu,
33};
34use crate::etl::field::Fields;
35use crate::etl::processor::{
36 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, FIELDS_NAME,
37 FIELD_NAME, IGNORE_MISSING_NAME, PATTERN_NAME,
38};
39
40lazy_static! {
41 static ref GROUPS_NAME_REGEX: Regex = Regex::new(r"\(\?P?<([[:word:]]+)>.+?\)").unwrap();
42}
43
44fn get_regex_group_names(s: &str) -> Vec<String> {
45 GROUPS_NAME_REGEX
46 .captures_iter(s)
47 .filter_map(|c| c.get(1).map(|m| m.as_str().to_string()))
48 .collect()
49}
50
51fn generate_key(prefix: &str, group: &str) -> String {
52 format!("{prefix}_{group}")
53}
54
55#[derive(Debug)]
56struct GroupRegex {
57 origin: String,
58 regex: Regex,
59 groups: Vec<String>,
60}
61
62impl std::fmt::Display for GroupRegex {
63 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
64 let groups = self.groups.join(", ");
65 write!(f, "{}, groups: [{groups}]", self.origin)
66 }
67}
68
69impl std::str::FromStr for GroupRegex {
70 type Err = Error;
71
72 fn from_str(origin: &str) -> Result<Self> {
73 let groups = get_regex_group_names(origin);
74 if groups.is_empty() {
75 return RegexNamedGroupNotFoundSnafu { origin }.fail();
76 }
77
78 let regex = Regex::new(origin).context(RegexSnafu { pattern: origin })?;
79 Ok(GroupRegex {
80 origin: origin.into(),
81 regex,
82 groups,
83 })
84 }
85}
86
87impl TryFrom<&yaml_rust::yaml::Hash> for RegexProcessor {
88 type Error = Error;
89
90 fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
91 let mut fields = Fields::default();
92 let mut patterns: Vec<GroupRegex> = vec![];
93 let mut ignore_missing = false;
94
95 for (k, v) in value.iter() {
96 let key = k
97 .as_str()
98 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
99 match key {
100 FIELD_NAME => {
101 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
102 }
103 FIELDS_NAME => {
104 fields = yaml_new_fields(v, FIELDS_NAME)?;
105 }
106 PATTERN_NAME => {
107 let pattern = yaml_string(v, PATTERN_NAME)?;
108 let gr = pattern.parse()?;
109 patterns.push(gr);
110 }
111 PATTERNS_NAME => {
112 for pattern in yaml_strings(v, PATTERNS_NAME)? {
113 let gr = pattern.parse()?;
114 patterns.push(gr);
115 }
116 }
117 IGNORE_MISSING_NAME => {
118 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
119 }
120 _ => {}
121 }
122 }
123
124 let processor_builder = RegexProcessor {
125 fields,
126 patterns,
127 ignore_missing,
128 };
129
130 processor_builder.check()
131 }
132}
133
134#[derive(Debug, Default)]
137pub struct RegexProcessor {
138 fields: Fields,
139 patterns: Vec<GroupRegex>,
140 ignore_missing: bool,
141}
142
143impl RegexProcessor {
144 fn check(self) -> Result<Self> {
145 if self.fields.is_empty() {
146 return RegexNoValidFieldSnafu {
147 processor: PROCESSOR_REGEX,
148 }
149 .fail();
150 }
151
152 if self.patterns.is_empty() {
153 return RegexNoValidPatternSnafu {
154 processor: PROCESSOR_REGEX,
155 }
156 .fail();
157 }
158
159 Ok(self)
160 }
161
162 fn try_with_patterns(&mut self, patterns: Vec<String>) -> Result<()> {
163 let mut rs = vec![];
164 for pattern in patterns {
165 let gr = pattern.parse()?;
166 rs.push(gr);
167 }
168 self.patterns = rs;
169 Ok(())
170 }
171
172 fn process(&self, prefix: &str, val: &str) -> Result<BTreeMap<KeyString, VrlValue>> {
173 let mut result = BTreeMap::new();
174 for gr in self.patterns.iter() {
175 if let Some(captures) = gr.regex.captures(val) {
176 for group in gr.groups.iter() {
177 if let Some(capture) = captures.name(group) {
178 let value = capture.as_str().to_string();
179 result.insert(
180 KeyString::from(generate_key(prefix, group)),
181 VrlValue::Bytes(Bytes::from(value)),
182 );
183 }
184 }
185 }
186 }
187 Ok(result)
188 }
189}
190
191impl Processor for RegexProcessor {
192 fn kind(&self) -> &str {
193 PROCESSOR_REGEX
194 }
195
196 fn ignore_missing(&self) -> bool {
197 self.ignore_missing
198 }
199
200 fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
201 for field in self.fields.iter() {
202 let index = field.input_field();
203 let prefix = field.target_or_input_field();
204 let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
205 match val.get(index) {
206 Some(VrlValue::Bytes(s)) => {
207 let result = self.process(prefix, String::from_utf8_lossy(s).as_ref())?;
208 val.extend(result);
209 }
210 Some(VrlValue::Null) | None => {
211 if !self.ignore_missing {
212 return ProcessorMissingFieldSnafu {
213 processor: self.kind(),
214 field: field.input_field(),
215 }
216 .fail();
217 }
218 }
219 Some(v) => {
220 return ProcessorExpectStringSnafu {
221 processor: self.kind(),
222 v: v.clone(),
223 }
224 .fail();
225 }
226 }
227 }
228
229 Ok(val)
230 }
231}
232#[cfg(test)]
233mod tests {
234 use itertools::Itertools;
235 use vrl::value::Value as VrlValue;
236
237 use super::*;
238 use crate::etl::processor::regex::RegexProcessor;
239
240 #[test]
241 fn test_simple_parse() {
242 let pipeline_str = r#"fields: ["a"]
243patterns: ['(?<ar>\d)']
244ignore_missing: false"#;
245
246 let processor_yaml = yaml_rust::YamlLoader::load_from_str(pipeline_str)
247 .unwrap()
248 .pop()
249 .unwrap();
250 let processor_yaml_hash = processor_yaml.as_hash().unwrap();
251 let processor = RegexProcessor::try_from(processor_yaml_hash).unwrap();
252
253 let result = processor.process("a", "123").unwrap();
256
257 let v = vec![(KeyString::from("a_ar"), VrlValue::Bytes(Bytes::from("1")))]
258 .into_iter()
259 .collect::<BTreeMap<KeyString, VrlValue>>();
260
261 assert_eq!(v, result);
262 }
263
264 #[test]
265 fn test_process() {
266 let cc = "[c=c,n=US_CA_SANJOSE,o=55155]";
267 let cg = "[a=12.34.567.89,b=12345678,c=g,n=US_CA_SANJOSE,o=20940]";
268 let co = "[a=987.654.321.09,c=o]";
269 let cp = "[c=p,n=US_CA_SANJOSE,o=55155]";
270 let cw = "[c=w,n=US_CA_SANJOSE,o=55155]";
271 let breadcrumbs_str = [cc, cg, co, cp, cw].iter().join(",");
272
273 let temporary_map: BTreeMap<KeyString, VrlValue> = [
274 (
275 "breadcrumbs_parent",
276 VrlValue::Bytes(Bytes::from(cc.to_string())),
277 ),
278 (
279 "breadcrumbs_edge",
280 VrlValue::Bytes(Bytes::from(cg.to_string())),
281 ),
282 (
283 "breadcrumbs_origin",
284 VrlValue::Bytes(Bytes::from(co.to_string())),
285 ),
286 (
287 "breadcrumbs_peer",
288 VrlValue::Bytes(Bytes::from(cp.to_string())),
289 ),
290 (
291 "breadcrumbs_wrapper",
292 VrlValue::Bytes(Bytes::from(cw.to_string())),
293 ),
294 ]
295 .into_iter()
296 .map(|(k, v)| (KeyString::from(k), v))
297 .collect();
298
299 {
300 let pipeline_str = r#"fields: ["breadcrumbs"]
303patterns:
304 - '(?<parent>\[[^\[]*c=c[^\]]*\])'
305 - '(?<edge>\[[^\[]*c=g[^\]]*\])'
306 - '(?<origin>\[[^\[]*c=o[^\]]*\])'
307 - '(?<peer>\[[^\[]*c=p[^\]]*\])'
308 - '(?<wrapper>\[[^\[]*c=w[^\]]*\])'
309ignore_missing: false"#;
310
311 let processor_yaml = yaml_rust::YamlLoader::load_from_str(pipeline_str)
312 .unwrap()
313 .pop()
314 .unwrap();
315 let processor_yaml_hash = processor_yaml.as_hash().unwrap();
316 let processor = RegexProcessor::try_from(processor_yaml_hash).unwrap();
317
318 let result = processor.process("breadcrumbs", &breadcrumbs_str).unwrap();
319
320 assert_eq!(temporary_map, result);
321 }
322
323 {
324 let pipeline_str = r#"fields:
327 - breadcrumbs_parent, parent
328 - breadcrumbs_edge, edge
329 - breadcrumbs_origin, origin
330 - breadcrumbs_peer, peer
331 - breadcrumbs_wrapper, wrapper
332patterns:
333 - 'a=(?<ip>[^,\]]+)'
334 - 'b=(?<request_id>[^,\]]+)'
335 - 'k=(?<request_end_time>[^,\]]+)'
336 - 'l=(?<turn_around_time>[^,\]]+)'
337 - 'm=(?<dns_lookup_time>[^,\]]+)'
338 - 'n=(?<geo>[^,\]]+)'
339 - 'o=(?<asn>[^,\]]+)'
340ignore_missing: false"#;
341
342 let processor_yaml = yaml_rust::YamlLoader::load_from_str(pipeline_str)
343 .unwrap()
344 .pop()
345 .unwrap();
346 let processor_yaml_hash = processor_yaml.as_hash().unwrap();
347 let processor = RegexProcessor::try_from(processor_yaml_hash).unwrap();
348
349 let mut result = BTreeMap::new();
350 for field in processor.fields.iter() {
351 let s = temporary_map.get(field.input_field()).unwrap();
352 let s = s.to_string_lossy();
353 let prefix = field.target_or_input_field();
354
355 let r = processor.process(prefix, s.as_ref()).unwrap();
356
357 result.extend(r);
358 }
359
360 let new_values = vec![
361 (
362 "edge_ip",
363 VrlValue::Bytes(Bytes::from("12.34.567.89".to_string())),
364 ),
365 (
366 "edge_request_id",
367 VrlValue::Bytes(Bytes::from("12345678".to_string())),
368 ),
369 (
370 "edge_geo",
371 VrlValue::Bytes(Bytes::from("US_CA_SANJOSE".to_string())),
372 ),
373 (
374 "edge_asn",
375 VrlValue::Bytes(Bytes::from("20940".to_string())),
376 ),
377 (
378 "origin_ip",
379 VrlValue::Bytes(Bytes::from("987.654.321.09".to_string())),
380 ),
381 (
382 "peer_asn",
383 VrlValue::Bytes(Bytes::from("55155".to_string())),
384 ),
385 (
386 "peer_geo",
387 VrlValue::Bytes(Bytes::from("US_CA_SANJOSE".to_string())),
388 ),
389 (
390 "parent_asn",
391 VrlValue::Bytes(Bytes::from("55155".to_string())),
392 ),
393 (
394 "parent_geo",
395 VrlValue::Bytes(Bytes::from("US_CA_SANJOSE".to_string())),
396 ),
397 (
398 "wrapper_asn",
399 VrlValue::Bytes(Bytes::from("55155".to_string())),
400 ),
401 (
402 "wrapper_geo",
403 VrlValue::Bytes(Bytes::from("US_CA_SANJOSE".to_string())),
404 ),
405 ]
406 .into_iter()
407 .map(|(k, v)| (KeyString::from(k), v))
408 .collect::<BTreeMap<KeyString, VrlValue>>();
409
410 assert_eq!(result, new_values);
411 }
412 }
413}