pipeline/etl/processor/
regex.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// field_name and prefix with comma separated, like:
16// name, new_name
17const PATTERNS_NAME: &str = "patterns";
18
19pub(crate) const PROCESSOR_REGEX: &str = "regex";
20
21use lazy_static::lazy_static;
22use regex::Regex;
23use snafu::{OptionExt, ResultExt};
24
25use crate::error::{
26    Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu,
27    RegexNamedGroupNotFoundSnafu, RegexNoValidFieldSnafu, RegexNoValidPatternSnafu, RegexSnafu,
28    Result,
29};
30use crate::etl::field::Fields;
31use crate::etl::processor::{
32    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, FIELDS_NAME,
33    FIELD_NAME, IGNORE_MISSING_NAME, PATTERN_NAME,
34};
35use crate::etl::value::Value;
36use crate::etl::PipelineMap;
37
38lazy_static! {
39    static ref GROUPS_NAME_REGEX: Regex = Regex::new(r"\(\?P?<([[:word:]]+)>.+?\)").unwrap();
40}
41
42fn get_regex_group_names(s: &str) -> Vec<String> {
43    GROUPS_NAME_REGEX
44        .captures_iter(s)
45        .filter_map(|c| c.get(1).map(|m| m.as_str().to_string()))
46        .collect()
47}
48
49fn generate_key(prefix: &str, group: &str) -> String {
50    format!("{prefix}_{group}")
51}
52
53#[derive(Debug)]
54struct GroupRegex {
55    origin: String,
56    regex: Regex,
57    groups: Vec<String>,
58}
59
60impl std::fmt::Display for GroupRegex {
61    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
62        let groups = self.groups.join(", ");
63        write!(f, "{}, groups: [{groups}]", self.origin)
64    }
65}
66
67impl std::str::FromStr for GroupRegex {
68    type Err = Error;
69
70    fn from_str(origin: &str) -> Result<Self> {
71        let groups = get_regex_group_names(origin);
72        if groups.is_empty() {
73            return RegexNamedGroupNotFoundSnafu { origin }.fail();
74        }
75
76        let regex = Regex::new(origin).context(RegexSnafu { pattern: origin })?;
77        Ok(GroupRegex {
78            origin: origin.into(),
79            regex,
80            groups,
81        })
82    }
83}
84
85impl TryFrom<&yaml_rust::yaml::Hash> for RegexProcessor {
86    type Error = Error;
87
88    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
89        let mut fields = Fields::default();
90        let mut patterns: Vec<GroupRegex> = vec![];
91        let mut ignore_missing = false;
92
93        for (k, v) in value.iter() {
94            let key = k
95                .as_str()
96                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
97            match key {
98                FIELD_NAME => {
99                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
100                }
101                FIELDS_NAME => {
102                    fields = yaml_new_fields(v, FIELDS_NAME)?;
103                }
104                PATTERN_NAME => {
105                    let pattern = yaml_string(v, PATTERN_NAME)?;
106                    let gr = pattern.parse()?;
107                    patterns.push(gr);
108                }
109                PATTERNS_NAME => {
110                    for pattern in yaml_strings(v, PATTERNS_NAME)? {
111                        let gr = pattern.parse()?;
112                        patterns.push(gr);
113                    }
114                }
115                IGNORE_MISSING_NAME => {
116                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
117                }
118                _ => {}
119            }
120        }
121
122        let processor_builder = RegexProcessor {
123            fields,
124            patterns,
125            ignore_missing,
126        };
127
128        processor_builder.check()
129    }
130}
131
132/// only support string value
133/// if no value found from a pattern, the target_field will be ignored
134#[derive(Debug, Default)]
135pub struct RegexProcessor {
136    fields: Fields,
137    patterns: Vec<GroupRegex>,
138    ignore_missing: bool,
139}
140
141impl RegexProcessor {
142    fn check(self) -> Result<Self> {
143        if self.fields.is_empty() {
144            return RegexNoValidFieldSnafu {
145                processor: PROCESSOR_REGEX,
146            }
147            .fail();
148        }
149
150        if self.patterns.is_empty() {
151            return RegexNoValidPatternSnafu {
152                processor: PROCESSOR_REGEX,
153            }
154            .fail();
155        }
156
157        Ok(self)
158    }
159
160    fn try_with_patterns(&mut self, patterns: Vec<String>) -> Result<()> {
161        let mut rs = vec![];
162        for pattern in patterns {
163            let gr = pattern.parse()?;
164            rs.push(gr);
165        }
166        self.patterns = rs;
167        Ok(())
168    }
169
170    fn process(&self, prefix: &str, val: &str) -> Result<PipelineMap> {
171        let mut result = PipelineMap::new();
172        for gr in self.patterns.iter() {
173            if let Some(captures) = gr.regex.captures(val) {
174                for group in gr.groups.iter() {
175                    if let Some(capture) = captures.name(group) {
176                        let value = capture.as_str().to_string();
177                        result.insert(generate_key(prefix, group), Value::String(value));
178                    }
179                }
180            }
181        }
182        Ok(result)
183    }
184}
185
186impl Processor for RegexProcessor {
187    fn kind(&self) -> &str {
188        PROCESSOR_REGEX
189    }
190
191    fn ignore_missing(&self) -> bool {
192        self.ignore_missing
193    }
194
195    fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
196        for field in self.fields.iter() {
197            let index = field.input_field();
198            let prefix = field.target_or_input_field();
199            match val.get(index) {
200                Some(Value::String(s)) => {
201                    let result = self.process(prefix, s)?;
202                    val.extend(result);
203                }
204                Some(Value::Null) | None => {
205                    if !self.ignore_missing {
206                        return ProcessorMissingFieldSnafu {
207                            processor: self.kind(),
208                            field: field.input_field(),
209                        }
210                        .fail();
211                    }
212                }
213                Some(v) => {
214                    return ProcessorExpectStringSnafu {
215                        processor: self.kind(),
216                        v: v.clone(),
217                    }
218                    .fail();
219                }
220            }
221        }
222
223        Ok(())
224    }
225}
226#[cfg(test)]
227mod tests {
228    use ahash::{HashMap, HashMapExt};
229    use itertools::Itertools;
230
231    use super::*;
232    use crate::etl::processor::regex::RegexProcessor;
233    use crate::etl::value::{Map, Value};
234
235    #[test]
236    fn test_simple_parse() {
237        let pipeline_str = r#"fields: ["a"]
238patterns: ['(?<ar>\d)']
239ignore_missing: false"#;
240
241        let processor_yaml = yaml_rust::YamlLoader::load_from_str(pipeline_str)
242            .unwrap()
243            .pop()
244            .unwrap();
245        let processor_yaml_hash = processor_yaml.as_hash().unwrap();
246        let processor = RegexProcessor::try_from(processor_yaml_hash).unwrap();
247
248        // single field (with prefix), multiple patterns
249
250        let result = processor.process("a", "123").unwrap();
251
252        let map = Map { values: result };
253
254        let v = Map {
255            values: vec![("a_ar".to_string(), Value::String("1".to_string()))]
256                .into_iter()
257                .collect(),
258        };
259
260        assert_eq!(v, map);
261    }
262
263    #[test]
264    fn test_process() {
265        let cc = "[c=c,n=US_CA_SANJOSE,o=55155]";
266        let cg = "[a=12.34.567.89,b=12345678,c=g,n=US_CA_SANJOSE,o=20940]";
267        let co = "[a=987.654.321.09,c=o]";
268        let cp = "[c=p,n=US_CA_SANJOSE,o=55155]";
269        let cw = "[c=w,n=US_CA_SANJOSE,o=55155]";
270        let breadcrumbs_str = [cc, cg, co, cp, cw].iter().join(",");
271
272        let temporary_map: PipelineMap = [
273            ("breadcrumbs_parent", Value::String(cc.to_string())),
274            ("breadcrumbs_edge", Value::String(cg.to_string())),
275            ("breadcrumbs_origin", Value::String(co.to_string())),
276            ("breadcrumbs_peer", Value::String(cp.to_string())),
277            ("breadcrumbs_wrapper", Value::String(cw.to_string())),
278        ]
279        .into_iter()
280        .map(|(k, v)| (k.to_string(), v))
281        .collect();
282
283        {
284            // single field (with prefix), multiple patterns
285
286            let pipeline_str = r#"fields: ["breadcrumbs"]
287patterns:
288  - '(?<parent>\[[^\[]*c=c[^\]]*\])'
289  - '(?<edge>\[[^\[]*c=g[^\]]*\])'
290  - '(?<origin>\[[^\[]*c=o[^\]]*\])'
291  - '(?<peer>\[[^\[]*c=p[^\]]*\])'
292  - '(?<wrapper>\[[^\[]*c=w[^\]]*\])'
293ignore_missing: false"#;
294
295            let processor_yaml = yaml_rust::YamlLoader::load_from_str(pipeline_str)
296                .unwrap()
297                .pop()
298                .unwrap();
299            let processor_yaml_hash = processor_yaml.as_hash().unwrap();
300            let processor = RegexProcessor::try_from(processor_yaml_hash).unwrap();
301
302            let result = processor.process("breadcrumbs", &breadcrumbs_str).unwrap();
303
304            assert_eq!(temporary_map, result);
305        }
306
307        {
308            // multiple fields (with prefix), multiple patterns
309
310            let pipeline_str = r#"fields:
311  - breadcrumbs_parent, parent
312  - breadcrumbs_edge, edge
313  - breadcrumbs_origin, origin
314  - breadcrumbs_peer, peer
315  - breadcrumbs_wrapper, wrapper
316patterns:
317  - 'a=(?<ip>[^,\]]+)'
318  - 'b=(?<request_id>[^,\]]+)'
319  - 'k=(?<request_end_time>[^,\]]+)'
320  - 'l=(?<turn_around_time>[^,\]]+)'
321  - 'm=(?<dns_lookup_time>[^,\]]+)'
322  - 'n=(?<geo>[^,\]]+)'
323  - 'o=(?<asn>[^,\]]+)'
324ignore_missing: false"#;
325
326            let processor_yaml = yaml_rust::YamlLoader::load_from_str(pipeline_str)
327                .unwrap()
328                .pop()
329                .unwrap();
330            let processor_yaml_hash = processor_yaml.as_hash().unwrap();
331            let processor = RegexProcessor::try_from(processor_yaml_hash).unwrap();
332
333            let mut result = HashMap::new();
334            for field in processor.fields.iter() {
335                let s = temporary_map
336                    .get(field.input_field())
337                    .unwrap()
338                    .to_str_value();
339                let prefix = field.target_or_input_field();
340
341                let r = processor.process(prefix, &s).unwrap();
342
343                result.extend(r);
344            }
345
346            let new_values = vec![
347                ("edge_ip", Value::String("12.34.567.89".to_string())),
348                ("edge_request_id", Value::String("12345678".to_string())),
349                ("edge_geo", Value::String("US_CA_SANJOSE".to_string())),
350                ("edge_asn", Value::String("20940".to_string())),
351                ("origin_ip", Value::String("987.654.321.09".to_string())),
352                ("peer_asn", Value::String("55155".to_string())),
353                ("peer_geo", Value::String("US_CA_SANJOSE".to_string())),
354                ("parent_asn", Value::String("55155".to_string())),
355                ("parent_geo", Value::String("US_CA_SANJOSE".to_string())),
356                ("wrapper_asn", Value::String("55155".to_string())),
357                ("wrapper_geo", Value::String("US_CA_SANJOSE".to_string())),
358            ]
359            .into_iter()
360            .map(|(k, v)| (k.to_string(), v))
361            .collect();
362
363            assert_eq!(result, new_values);
364        }
365    }
366}