1const PATTERNS_NAME: &str = "patterns";
18
19pub(crate) const PROCESSOR_REGEX: &str = "regex";
20
21use lazy_static::lazy_static;
22use regex::Regex;
23use snafu::{OptionExt, ResultExt};
24
25use crate::error::{
26 Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu,
27 RegexNamedGroupNotFoundSnafu, RegexNoValidFieldSnafu, RegexNoValidPatternSnafu, RegexSnafu,
28 Result,
29};
30use crate::etl::field::Fields;
31use crate::etl::processor::{
32 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, FIELDS_NAME,
33 FIELD_NAME, IGNORE_MISSING_NAME, PATTERN_NAME,
34};
35use crate::etl::value::Value;
36use crate::etl::PipelineMap;
37
38lazy_static! {
39 static ref GROUPS_NAME_REGEX: Regex = Regex::new(r"\(\?P?<([[:word:]]+)>.+?\)").unwrap();
40}
41
42fn get_regex_group_names(s: &str) -> Vec<String> {
43 GROUPS_NAME_REGEX
44 .captures_iter(s)
45 .filter_map(|c| c.get(1).map(|m| m.as_str().to_string()))
46 .collect()
47}
48
49fn generate_key(prefix: &str, group: &str) -> String {
50 format!("{prefix}_{group}")
51}
52
53#[derive(Debug)]
54struct GroupRegex {
55 origin: String,
56 regex: Regex,
57 groups: Vec<String>,
58}
59
60impl std::fmt::Display for GroupRegex {
61 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
62 let groups = self.groups.join(", ");
63 write!(f, "{}, groups: [{groups}]", self.origin)
64 }
65}
66
67impl std::str::FromStr for GroupRegex {
68 type Err = Error;
69
70 fn from_str(origin: &str) -> Result<Self> {
71 let groups = get_regex_group_names(origin);
72 if groups.is_empty() {
73 return RegexNamedGroupNotFoundSnafu { origin }.fail();
74 }
75
76 let regex = Regex::new(origin).context(RegexSnafu { pattern: origin })?;
77 Ok(GroupRegex {
78 origin: origin.into(),
79 regex,
80 groups,
81 })
82 }
83}
84
85impl TryFrom<&yaml_rust::yaml::Hash> for RegexProcessor {
86 type Error = Error;
87
88 fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
89 let mut fields = Fields::default();
90 let mut patterns: Vec<GroupRegex> = vec![];
91 let mut ignore_missing = false;
92
93 for (k, v) in value.iter() {
94 let key = k
95 .as_str()
96 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
97 match key {
98 FIELD_NAME => {
99 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
100 }
101 FIELDS_NAME => {
102 fields = yaml_new_fields(v, FIELDS_NAME)?;
103 }
104 PATTERN_NAME => {
105 let pattern = yaml_string(v, PATTERN_NAME)?;
106 let gr = pattern.parse()?;
107 patterns.push(gr);
108 }
109 PATTERNS_NAME => {
110 for pattern in yaml_strings(v, PATTERNS_NAME)? {
111 let gr = pattern.parse()?;
112 patterns.push(gr);
113 }
114 }
115 IGNORE_MISSING_NAME => {
116 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
117 }
118 _ => {}
119 }
120 }
121
122 let processor_builder = RegexProcessor {
123 fields,
124 patterns,
125 ignore_missing,
126 };
127
128 processor_builder.check()
129 }
130}
131
132#[derive(Debug, Default)]
135pub struct RegexProcessor {
136 fields: Fields,
137 patterns: Vec<GroupRegex>,
138 ignore_missing: bool,
139}
140
141impl RegexProcessor {
142 fn check(self) -> Result<Self> {
143 if self.fields.is_empty() {
144 return RegexNoValidFieldSnafu {
145 processor: PROCESSOR_REGEX,
146 }
147 .fail();
148 }
149
150 if self.patterns.is_empty() {
151 return RegexNoValidPatternSnafu {
152 processor: PROCESSOR_REGEX,
153 }
154 .fail();
155 }
156
157 Ok(self)
158 }
159
160 fn try_with_patterns(&mut self, patterns: Vec<String>) -> Result<()> {
161 let mut rs = vec![];
162 for pattern in patterns {
163 let gr = pattern.parse()?;
164 rs.push(gr);
165 }
166 self.patterns = rs;
167 Ok(())
168 }
169
170 fn process(&self, prefix: &str, val: &str) -> Result<PipelineMap> {
171 let mut result = PipelineMap::new();
172 for gr in self.patterns.iter() {
173 if let Some(captures) = gr.regex.captures(val) {
174 for group in gr.groups.iter() {
175 if let Some(capture) = captures.name(group) {
176 let value = capture.as_str().to_string();
177 result.insert(generate_key(prefix, group), Value::String(value));
178 }
179 }
180 }
181 }
182 Ok(result)
183 }
184}
185
186impl Processor for RegexProcessor {
187 fn kind(&self) -> &str {
188 PROCESSOR_REGEX
189 }
190
191 fn ignore_missing(&self) -> bool {
192 self.ignore_missing
193 }
194
195 fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
196 for field in self.fields.iter() {
197 let index = field.input_field();
198 let prefix = field.target_or_input_field();
199 match val.get(index) {
200 Some(Value::String(s)) => {
201 let result = self.process(prefix, s)?;
202 val.extend(result);
203 }
204 Some(Value::Null) | None => {
205 if !self.ignore_missing {
206 return ProcessorMissingFieldSnafu {
207 processor: self.kind(),
208 field: field.input_field(),
209 }
210 .fail();
211 }
212 }
213 Some(v) => {
214 return ProcessorExpectStringSnafu {
215 processor: self.kind(),
216 v: v.clone(),
217 }
218 .fail();
219 }
220 }
221 }
222
223 Ok(())
224 }
225}
226#[cfg(test)]
227mod tests {
228 use ahash::{HashMap, HashMapExt};
229 use itertools::Itertools;
230
231 use super::*;
232 use crate::etl::processor::regex::RegexProcessor;
233 use crate::etl::value::{Map, Value};
234
235 #[test]
236 fn test_simple_parse() {
237 let pipeline_str = r#"fields: ["a"]
238patterns: ['(?<ar>\d)']
239ignore_missing: false"#;
240
241 let processor_yaml = yaml_rust::YamlLoader::load_from_str(pipeline_str)
242 .unwrap()
243 .pop()
244 .unwrap();
245 let processor_yaml_hash = processor_yaml.as_hash().unwrap();
246 let processor = RegexProcessor::try_from(processor_yaml_hash).unwrap();
247
248 let result = processor.process("a", "123").unwrap();
251
252 let map = Map { values: result };
253
254 let v = Map {
255 values: vec![("a_ar".to_string(), Value::String("1".to_string()))]
256 .into_iter()
257 .collect(),
258 };
259
260 assert_eq!(v, map);
261 }
262
263 #[test]
264 fn test_process() {
265 let cc = "[c=c,n=US_CA_SANJOSE,o=55155]";
266 let cg = "[a=12.34.567.89,b=12345678,c=g,n=US_CA_SANJOSE,o=20940]";
267 let co = "[a=987.654.321.09,c=o]";
268 let cp = "[c=p,n=US_CA_SANJOSE,o=55155]";
269 let cw = "[c=w,n=US_CA_SANJOSE,o=55155]";
270 let breadcrumbs_str = [cc, cg, co, cp, cw].iter().join(",");
271
272 let temporary_map: PipelineMap = [
273 ("breadcrumbs_parent", Value::String(cc.to_string())),
274 ("breadcrumbs_edge", Value::String(cg.to_string())),
275 ("breadcrumbs_origin", Value::String(co.to_string())),
276 ("breadcrumbs_peer", Value::String(cp.to_string())),
277 ("breadcrumbs_wrapper", Value::String(cw.to_string())),
278 ]
279 .into_iter()
280 .map(|(k, v)| (k.to_string(), v))
281 .collect();
282
283 {
284 let pipeline_str = r#"fields: ["breadcrumbs"]
287patterns:
288 - '(?<parent>\[[^\[]*c=c[^\]]*\])'
289 - '(?<edge>\[[^\[]*c=g[^\]]*\])'
290 - '(?<origin>\[[^\[]*c=o[^\]]*\])'
291 - '(?<peer>\[[^\[]*c=p[^\]]*\])'
292 - '(?<wrapper>\[[^\[]*c=w[^\]]*\])'
293ignore_missing: false"#;
294
295 let processor_yaml = yaml_rust::YamlLoader::load_from_str(pipeline_str)
296 .unwrap()
297 .pop()
298 .unwrap();
299 let processor_yaml_hash = processor_yaml.as_hash().unwrap();
300 let processor = RegexProcessor::try_from(processor_yaml_hash).unwrap();
301
302 let result = processor.process("breadcrumbs", &breadcrumbs_str).unwrap();
303
304 assert_eq!(temporary_map, result);
305 }
306
307 {
308 let pipeline_str = r#"fields:
311 - breadcrumbs_parent, parent
312 - breadcrumbs_edge, edge
313 - breadcrumbs_origin, origin
314 - breadcrumbs_peer, peer
315 - breadcrumbs_wrapper, wrapper
316patterns:
317 - 'a=(?<ip>[^,\]]+)'
318 - 'b=(?<request_id>[^,\]]+)'
319 - 'k=(?<request_end_time>[^,\]]+)'
320 - 'l=(?<turn_around_time>[^,\]]+)'
321 - 'm=(?<dns_lookup_time>[^,\]]+)'
322 - 'n=(?<geo>[^,\]]+)'
323 - 'o=(?<asn>[^,\]]+)'
324ignore_missing: false"#;
325
326 let processor_yaml = yaml_rust::YamlLoader::load_from_str(pipeline_str)
327 .unwrap()
328 .pop()
329 .unwrap();
330 let processor_yaml_hash = processor_yaml.as_hash().unwrap();
331 let processor = RegexProcessor::try_from(processor_yaml_hash).unwrap();
332
333 let mut result = HashMap::new();
334 for field in processor.fields.iter() {
335 let s = temporary_map
336 .get(field.input_field())
337 .unwrap()
338 .to_str_value();
339 let prefix = field.target_or_input_field();
340
341 let r = processor.process(prefix, &s).unwrap();
342
343 result.extend(r);
344 }
345
346 let new_values = vec![
347 ("edge_ip", Value::String("12.34.567.89".to_string())),
348 ("edge_request_id", Value::String("12345678".to_string())),
349 ("edge_geo", Value::String("US_CA_SANJOSE".to_string())),
350 ("edge_asn", Value::String("20940".to_string())),
351 ("origin_ip", Value::String("987.654.321.09".to_string())),
352 ("peer_asn", Value::String("55155".to_string())),
353 ("peer_geo", Value::String("US_CA_SANJOSE".to_string())),
354 ("parent_asn", Value::String("55155".to_string())),
355 ("parent_geo", Value::String("US_CA_SANJOSE".to_string())),
356 ("wrapper_asn", Value::String("55155".to_string())),
357 ("wrapper_geo", Value::String("US_CA_SANJOSE".to_string())),
358 ]
359 .into_iter()
360 .map(|(k, v)| (k.to_string(), v))
361 .collect();
362
363 assert_eq!(result, new_values);
364 }
365 }
366}