pipeline/etl/processor/
regex.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// field_name and prefix with comma separated, like:
16// name, new_name
17const PATTERNS_NAME: &str = "patterns";
18
19pub(crate) const PROCESSOR_REGEX: &str = "regex";
20
21use std::collections::BTreeMap;
22
23use lazy_static::lazy_static;
24use regex::Regex;
25use snafu::{OptionExt, ResultExt};
26use vrl::prelude::Bytes;
27use vrl::value::{KeyString, Value as VrlValue};
28
29use crate::error::{
30    Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu,
31    RegexNamedGroupNotFoundSnafu, RegexNoValidFieldSnafu, RegexNoValidPatternSnafu, RegexSnafu,
32    Result, ValueMustBeMapSnafu,
33};
34use crate::etl::field::Fields;
35use crate::etl::processor::{
36    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, FIELDS_NAME,
37    FIELD_NAME, IGNORE_MISSING_NAME, PATTERN_NAME,
38};
39
40lazy_static! {
41    static ref GROUPS_NAME_REGEX: Regex = Regex::new(r"\(\?P?<([[:word:]]+)>.+?\)").unwrap();
42}
43
44fn get_regex_group_names(s: &str) -> Vec<String> {
45    GROUPS_NAME_REGEX
46        .captures_iter(s)
47        .filter_map(|c| c.get(1).map(|m| m.as_str().to_string()))
48        .collect()
49}
50
51fn generate_key(prefix: &str, group: &str) -> String {
52    format!("{prefix}_{group}")
53}
54
55#[derive(Debug)]
56struct GroupRegex {
57    origin: String,
58    regex: Regex,
59    groups: Vec<String>,
60}
61
62impl std::fmt::Display for GroupRegex {
63    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
64        let groups = self.groups.join(", ");
65        write!(f, "{}, groups: [{groups}]", self.origin)
66    }
67}
68
69impl std::str::FromStr for GroupRegex {
70    type Err = Error;
71
72    fn from_str(origin: &str) -> Result<Self> {
73        let groups = get_regex_group_names(origin);
74        if groups.is_empty() {
75            return RegexNamedGroupNotFoundSnafu { origin }.fail();
76        }
77
78        let regex = Regex::new(origin).context(RegexSnafu { pattern: origin })?;
79        Ok(GroupRegex {
80            origin: origin.into(),
81            regex,
82            groups,
83        })
84    }
85}
86
87impl TryFrom<&yaml_rust::yaml::Hash> for RegexProcessor {
88    type Error = Error;
89
90    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
91        let mut fields = Fields::default();
92        let mut patterns: Vec<GroupRegex> = vec![];
93        let mut ignore_missing = false;
94
95        for (k, v) in value.iter() {
96            let key = k
97                .as_str()
98                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
99            match key {
100                FIELD_NAME => {
101                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
102                }
103                FIELDS_NAME => {
104                    fields = yaml_new_fields(v, FIELDS_NAME)?;
105                }
106                PATTERN_NAME => {
107                    let pattern = yaml_string(v, PATTERN_NAME)?;
108                    let gr = pattern.parse()?;
109                    patterns.push(gr);
110                }
111                PATTERNS_NAME => {
112                    for pattern in yaml_strings(v, PATTERNS_NAME)? {
113                        let gr = pattern.parse()?;
114                        patterns.push(gr);
115                    }
116                }
117                IGNORE_MISSING_NAME => {
118                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
119                }
120                _ => {}
121            }
122        }
123
124        let processor_builder = RegexProcessor {
125            fields,
126            patterns,
127            ignore_missing,
128        };
129
130        processor_builder.check()
131    }
132}
133
134/// only support string value
135/// if no value found from a pattern, the target_field will be ignored
136#[derive(Debug, Default)]
137pub struct RegexProcessor {
138    fields: Fields,
139    patterns: Vec<GroupRegex>,
140    ignore_missing: bool,
141}
142
143impl RegexProcessor {
144    fn check(self) -> Result<Self> {
145        if self.fields.is_empty() {
146            return RegexNoValidFieldSnafu {
147                processor: PROCESSOR_REGEX,
148            }
149            .fail();
150        }
151
152        if self.patterns.is_empty() {
153            return RegexNoValidPatternSnafu {
154                processor: PROCESSOR_REGEX,
155            }
156            .fail();
157        }
158
159        Ok(self)
160    }
161
162    fn try_with_patterns(&mut self, patterns: Vec<String>) -> Result<()> {
163        let mut rs = vec![];
164        for pattern in patterns {
165            let gr = pattern.parse()?;
166            rs.push(gr);
167        }
168        self.patterns = rs;
169        Ok(())
170    }
171
172    fn process(&self, prefix: &str, val: &str) -> Result<BTreeMap<KeyString, VrlValue>> {
173        let mut result = BTreeMap::new();
174        for gr in self.patterns.iter() {
175            if let Some(captures) = gr.regex.captures(val) {
176                for group in gr.groups.iter() {
177                    if let Some(capture) = captures.name(group) {
178                        let value = capture.as_str().to_string();
179                        result.insert(
180                            KeyString::from(generate_key(prefix, group)),
181                            VrlValue::Bytes(Bytes::from(value)),
182                        );
183                    }
184                }
185            }
186        }
187        Ok(result)
188    }
189}
190
191impl Processor for RegexProcessor {
192    fn kind(&self) -> &str {
193        PROCESSOR_REGEX
194    }
195
196    fn ignore_missing(&self) -> bool {
197        self.ignore_missing
198    }
199
200    fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
201        for field in self.fields.iter() {
202            let index = field.input_field();
203            let prefix = field.target_or_input_field();
204            let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
205            match val.get(index) {
206                Some(VrlValue::Bytes(s)) => {
207                    let result = self.process(prefix, String::from_utf8_lossy(s).as_ref())?;
208                    val.extend(result);
209                }
210                Some(VrlValue::Null) | None => {
211                    if !self.ignore_missing {
212                        return ProcessorMissingFieldSnafu {
213                            processor: self.kind(),
214                            field: field.input_field(),
215                        }
216                        .fail();
217                    }
218                }
219                Some(v) => {
220                    return ProcessorExpectStringSnafu {
221                        processor: self.kind(),
222                        v: v.clone(),
223                    }
224                    .fail();
225                }
226            }
227        }
228
229        Ok(val)
230    }
231}
232#[cfg(test)]
233mod tests {
234    use itertools::Itertools;
235    use vrl::value::Value as VrlValue;
236
237    use super::*;
238    use crate::etl::processor::regex::RegexProcessor;
239
240    #[test]
241    fn test_simple_parse() {
242        let pipeline_str = r#"fields: ["a"]
243patterns: ['(?<ar>\d)']
244ignore_missing: false"#;
245
246        let processor_yaml = yaml_rust::YamlLoader::load_from_str(pipeline_str)
247            .unwrap()
248            .pop()
249            .unwrap();
250        let processor_yaml_hash = processor_yaml.as_hash().unwrap();
251        let processor = RegexProcessor::try_from(processor_yaml_hash).unwrap();
252
253        // single field (with prefix), multiple patterns
254
255        let result = processor.process("a", "123").unwrap();
256
257        let v = vec![(KeyString::from("a_ar"), VrlValue::Bytes(Bytes::from("1")))]
258            .into_iter()
259            .collect::<BTreeMap<KeyString, VrlValue>>();
260
261        assert_eq!(v, result);
262    }
263
264    #[test]
265    fn test_process() {
266        let cc = "[c=c,n=US_CA_SANJOSE,o=55155]";
267        let cg = "[a=12.34.567.89,b=12345678,c=g,n=US_CA_SANJOSE,o=20940]";
268        let co = "[a=987.654.321.09,c=o]";
269        let cp = "[c=p,n=US_CA_SANJOSE,o=55155]";
270        let cw = "[c=w,n=US_CA_SANJOSE,o=55155]";
271        let breadcrumbs_str = [cc, cg, co, cp, cw].iter().join(",");
272
273        let temporary_map: BTreeMap<KeyString, VrlValue> = [
274            (
275                "breadcrumbs_parent",
276                VrlValue::Bytes(Bytes::from(cc.to_string())),
277            ),
278            (
279                "breadcrumbs_edge",
280                VrlValue::Bytes(Bytes::from(cg.to_string())),
281            ),
282            (
283                "breadcrumbs_origin",
284                VrlValue::Bytes(Bytes::from(co.to_string())),
285            ),
286            (
287                "breadcrumbs_peer",
288                VrlValue::Bytes(Bytes::from(cp.to_string())),
289            ),
290            (
291                "breadcrumbs_wrapper",
292                VrlValue::Bytes(Bytes::from(cw.to_string())),
293            ),
294        ]
295        .into_iter()
296        .map(|(k, v)| (KeyString::from(k), v))
297        .collect();
298
299        {
300            // single field (with prefix), multiple patterns
301
302            let pipeline_str = r#"fields: ["breadcrumbs"]
303patterns:
304  - '(?<parent>\[[^\[]*c=c[^\]]*\])'
305  - '(?<edge>\[[^\[]*c=g[^\]]*\])'
306  - '(?<origin>\[[^\[]*c=o[^\]]*\])'
307  - '(?<peer>\[[^\[]*c=p[^\]]*\])'
308  - '(?<wrapper>\[[^\[]*c=w[^\]]*\])'
309ignore_missing: false"#;
310
311            let processor_yaml = yaml_rust::YamlLoader::load_from_str(pipeline_str)
312                .unwrap()
313                .pop()
314                .unwrap();
315            let processor_yaml_hash = processor_yaml.as_hash().unwrap();
316            let processor = RegexProcessor::try_from(processor_yaml_hash).unwrap();
317
318            let result = processor.process("breadcrumbs", &breadcrumbs_str).unwrap();
319
320            assert_eq!(temporary_map, result);
321        }
322
323        {
324            // multiple fields (with prefix), multiple patterns
325
326            let pipeline_str = r#"fields:
327  - breadcrumbs_parent, parent
328  - breadcrumbs_edge, edge
329  - breadcrumbs_origin, origin
330  - breadcrumbs_peer, peer
331  - breadcrumbs_wrapper, wrapper
332patterns:
333  - 'a=(?<ip>[^,\]]+)'
334  - 'b=(?<request_id>[^,\]]+)'
335  - 'k=(?<request_end_time>[^,\]]+)'
336  - 'l=(?<turn_around_time>[^,\]]+)'
337  - 'm=(?<dns_lookup_time>[^,\]]+)'
338  - 'n=(?<geo>[^,\]]+)'
339  - 'o=(?<asn>[^,\]]+)'
340ignore_missing: false"#;
341
342            let processor_yaml = yaml_rust::YamlLoader::load_from_str(pipeline_str)
343                .unwrap()
344                .pop()
345                .unwrap();
346            let processor_yaml_hash = processor_yaml.as_hash().unwrap();
347            let processor = RegexProcessor::try_from(processor_yaml_hash).unwrap();
348
349            let mut result = BTreeMap::new();
350            for field in processor.fields.iter() {
351                let s = temporary_map.get(field.input_field()).unwrap();
352                let s = s.to_string_lossy();
353                let prefix = field.target_or_input_field();
354
355                let r = processor.process(prefix, s.as_ref()).unwrap();
356
357                result.extend(r);
358            }
359
360            let new_values = vec![
361                (
362                    "edge_ip",
363                    VrlValue::Bytes(Bytes::from("12.34.567.89".to_string())),
364                ),
365                (
366                    "edge_request_id",
367                    VrlValue::Bytes(Bytes::from("12345678".to_string())),
368                ),
369                (
370                    "edge_geo",
371                    VrlValue::Bytes(Bytes::from("US_CA_SANJOSE".to_string())),
372                ),
373                (
374                    "edge_asn",
375                    VrlValue::Bytes(Bytes::from("20940".to_string())),
376                ),
377                (
378                    "origin_ip",
379                    VrlValue::Bytes(Bytes::from("987.654.321.09".to_string())),
380                ),
381                (
382                    "peer_asn",
383                    VrlValue::Bytes(Bytes::from("55155".to_string())),
384                ),
385                (
386                    "peer_geo",
387                    VrlValue::Bytes(Bytes::from("US_CA_SANJOSE".to_string())),
388                ),
389                (
390                    "parent_asn",
391                    VrlValue::Bytes(Bytes::from("55155".to_string())),
392                ),
393                (
394                    "parent_geo",
395                    VrlValue::Bytes(Bytes::from("US_CA_SANJOSE".to_string())),
396                ),
397                (
398                    "wrapper_asn",
399                    VrlValue::Bytes(Bytes::from("55155".to_string())),
400                ),
401                (
402                    "wrapper_geo",
403                    VrlValue::Bytes(Bytes::from("US_CA_SANJOSE".to_string())),
404                ),
405            ]
406            .into_iter()
407            .map(|(k, v)| (KeyString::from(k), v))
408            .collect::<BTreeMap<KeyString, VrlValue>>();
409
410            assert_eq!(result, new_values);
411        }
412    }
413}