pipeline/etl/processor/
digest.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Digest the input string by removing certain patterns.
16//!
17//! This processor can help to extract useful information from a string by removing certain patterns,
18//! which is often a variable from the log message. Digested fields are stored in a new field with the
19//! `_digest` suffix. And can be used for further processing or analysis like template occurrences count
20//! or similarity analysis.
21
22use std::borrow::Cow;
23
24use regex::Regex;
25use snafu::OptionExt;
26use vrl::prelude::Bytes;
27use vrl::value::{KeyString, Value as VrlValue};
28
29use crate::error::{
30    DigestPatternInvalidSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
31    ProcessorMissingFieldSnafu, Result, ValueMustBeMapSnafu,
32};
33use crate::etl::field::Fields;
34use crate::etl::processor::{
35    yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME,
36};
37
38pub(crate) const PROCESSOR_DIGEST: &str = "digest";
39
40const PRESETS_PATTERNS_NAME: &str = "presets";
41const REGEX_PATTERNS_NAME: &str = "regex";
42
43enum PresetPattern {
44    Numbers,
45    Quoted,
46    Bracketed,
47    Uuid,
48    Ip,
49}
50
51impl std::fmt::Display for PresetPattern {
52    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
53        match self {
54            PresetPattern::Numbers => write!(f, "numbers"),
55            PresetPattern::Quoted => write!(f, "quoted"),
56            PresetPattern::Bracketed => write!(f, "bracketed"),
57            PresetPattern::Uuid => write!(f, "uuid"),
58            PresetPattern::Ip => write!(f, "ip"),
59        }
60    }
61}
62
63impl std::str::FromStr for PresetPattern {
64    type Err = Error;
65
66    fn from_str(pattern: &str) -> Result<Self> {
67        match pattern {
68            "numbers" => Ok(PresetPattern::Numbers),
69            "quoted" => Ok(PresetPattern::Quoted),
70            "bracketed" => Ok(PresetPattern::Bracketed),
71            "uuid" => Ok(PresetPattern::Uuid),
72            "ip" => Ok(PresetPattern::Ip),
73            _ => DigestPatternInvalidSnafu { pattern }.fail(),
74        }
75    }
76}
77
78impl PresetPattern {
79    fn regex(&self) -> Regex {
80        match self {
81            PresetPattern::Numbers => Regex::new(r"\d+").unwrap(),
82            PresetPattern::Quoted => Regex::new(r#"["'“”‘’][^"'“”‘’]*["'“”‘’]"#).unwrap(),
83            PresetPattern::Bracketed => Regex::new(r#"[({\[<「『【〔[{〈《][^(){}\[\]<>「」『』【】〔〕[]{}〈〉《》]*[)}\]>」』】〕]}〉》]"#).unwrap(),
84            PresetPattern::Uuid => Regex::new(r"\b[0-9a-fA-F]{8}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{12}\b").unwrap(),
85            PresetPattern::Ip => Regex::new(r"((\d{1,3}\.){3}\d{1,3}(:\d+)?|(\[[0-9a-fA-F:]+\])(:\d+)?)").unwrap(),
86        }
87    }
88}
89
90/// Computes a digest (hash) of the input string.
91#[derive(Debug, Default)]
92pub struct DigestProcessor {
93    fields: Fields,
94    ignore_missing: bool,
95    patterns: Vec<Regex>,
96}
97
98impl DigestProcessor {
99    fn remove_quoted_content(&self, val: &str) -> String {
100        let re = Regex::new(r#""[^"]*""#).unwrap();
101        re.replace_all(val, "").to_string()
102    }
103
104    fn process_string(&self, val: &str) -> Result<VrlValue> {
105        let mut input = Cow::from(val);
106        for pattern in &self.patterns {
107            if let Cow::Owned(new_string) = pattern.replace_all(&input, "") {
108                input = Cow::Owned(new_string);
109            }
110        }
111
112        Ok(VrlValue::Bytes(Bytes::from(input.to_string())))
113    }
114
115    fn process(&self, val: &VrlValue) -> Result<VrlValue> {
116        match val {
117            VrlValue::Bytes(val) => self.process_string(String::from_utf8_lossy(val).as_ref()),
118            _ => ProcessorExpectStringSnafu {
119                processor: PROCESSOR_DIGEST,
120                v: val.clone(),
121            }
122            .fail(),
123        }
124    }
125}
126
127impl TryFrom<&yaml_rust::yaml::Hash> for DigestProcessor {
128    type Error = Error;
129
130    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
131        let mut fields = Fields::default();
132        let mut ignore_missing = false;
133        let mut patterns = Vec::new();
134
135        for (k, v) in value.iter() {
136            let key = k
137                .as_str()
138                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
139
140            match key {
141                FIELD_NAME => {
142                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
143                }
144                FIELDS_NAME => {
145                    fields = yaml_new_fields(v, FIELDS_NAME)?;
146                }
147                IGNORE_MISSING_NAME => {
148                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
149                }
150                PRESETS_PATTERNS_NAME => {
151                    let preset_patterns: Vec<String> = v
152                        .as_vec()
153                        .with_context(|| DigestPatternInvalidSnafu {
154                            pattern: key.to_string(),
155                        })?
156                        .iter()
157                        .map(|p| p.as_str().unwrap().to_string())
158                        .collect();
159                    for pattern in preset_patterns {
160                        let preset_pattern = pattern.parse::<PresetPattern>()?;
161                        let regex = preset_pattern.regex();
162                        patterns.push(regex);
163                    }
164                }
165                REGEX_PATTERNS_NAME => {
166                    let regex_patterns: Vec<String> = v
167                        .as_vec()
168                        .with_context(|| DigestPatternInvalidSnafu {
169                            pattern: key.to_string(),
170                        })?
171                        .iter()
172                        .map(|p| p.as_str().unwrap().to_string())
173                        .collect();
174                    for pattern in regex_patterns {
175                        let regex = Regex::new(&pattern).unwrap();
176                        patterns.push(regex);
177                    }
178                }
179                _ => {}
180            }
181        }
182
183        for field in fields.iter_mut() {
184            field.set_target_field(Some(format!("{}_digest", field.input_field())));
185        }
186
187        Ok(DigestProcessor {
188            fields,
189            patterns,
190            ignore_missing,
191        })
192    }
193}
194
195impl crate::etl::processor::Processor for DigestProcessor {
196    fn kind(&self) -> &str {
197        PROCESSOR_DIGEST
198    }
199
200    fn ignore_missing(&self) -> bool {
201        self.ignore_missing
202    }
203
204    fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
205        for field in self.fields.iter() {
206            let index = field.input_field();
207            let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
208            match val.get(index) {
209                Some(VrlValue::Null) | None => {
210                    if !self.ignore_missing {
211                        return ProcessorMissingFieldSnafu {
212                            processor: self.kind(),
213                            field: field.input_field(),
214                        }
215                        .fail();
216                    }
217                }
218                Some(v) => {
219                    let result = self.process(v)?;
220                    let output_index = field.target_or_input_field();
221                    val.insert(KeyString::from(output_index), result);
222                }
223            }
224        }
225        Ok(val)
226    }
227}
228
229#[cfg(test)]
230mod tests {
231
232    use super::*;
233
234    #[test]
235    fn test_digest_processor_ip() {
236        let processor = DigestProcessor {
237            fields: Fields::default(),
238            ignore_missing: false,
239            patterns: vec![PresetPattern::Ip.regex()],
240        };
241
242        let input = VrlValue::Bytes(Bytes::from("192.168.1.1".to_string()));
243        let result = processor.process(&input).unwrap();
244        assert_eq!(result, VrlValue::Bytes(Bytes::from("".to_string())));
245        let input = VrlValue::Bytes(Bytes::from("192.168.1.1:8080".to_string()));
246        let result = processor.process(&input).unwrap();
247        assert_eq!(result, VrlValue::Bytes(Bytes::from("".to_string())));
248
249        let input = VrlValue::Bytes(Bytes::from(
250            "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]".to_string(),
251        ));
252        let result = processor.process(&input).unwrap();
253        assert_eq!(result, VrlValue::Bytes(Bytes::from("".to_string())));
254
255        let input = VrlValue::Bytes(Bytes::from(
256            "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:8080".to_string(),
257        ));
258        let result = processor.process(&input).unwrap();
259        assert_eq!(result, VrlValue::Bytes(Bytes::from("".to_string())));
260
261        let input = VrlValue::Bytes(Bytes::from("not an ip".to_string()));
262        let result = processor.process(&input).unwrap();
263        assert_eq!(
264            result,
265            VrlValue::Bytes(Bytes::from("not an ip".to_string()))
266        );
267    }
268
269    #[test]
270    fn test_digest_processor_uuid() {
271        let processor = DigestProcessor {
272            fields: Fields::default(),
273            ignore_missing: false,
274            patterns: vec![PresetPattern::Uuid.regex()],
275        };
276        // UUID v4
277        let input = VrlValue::Bytes(Bytes::from(
278            "123e4567-e89b-12d3-a456-426614174000".to_string(),
279        ));
280        let result = processor.process(&input).unwrap();
281        assert_eq!(result, VrlValue::Bytes(Bytes::from("".to_string())));
282
283        // UUID v1
284        let input = VrlValue::Bytes(Bytes::from(
285            "6ba7b810-9dad-11d1-80b4-00c04fd430c8".to_string(),
286        ));
287        let result = processor.process(&input).unwrap();
288        assert_eq!(result, VrlValue::Bytes(Bytes::from("".to_string())));
289
290        // UUID v5
291        let input = VrlValue::Bytes(Bytes::from(
292            "886313e1-3b8a-5372-9b90-0c9aee199e5d".to_string(),
293        ));
294        let result = processor.process(&input).unwrap();
295        assert_eq!(result, VrlValue::Bytes(Bytes::from("".to_string())));
296
297        // UUID with uppercase letters
298        let input = VrlValue::Bytes(Bytes::from(
299            "A987FBC9-4BED-3078-CF07-9141BA07C9F3".to_string(),
300        ));
301        let result = processor.process(&input).unwrap();
302        assert_eq!(result, VrlValue::Bytes(Bytes::from("".to_string())));
303
304        // Negative case
305        let input = VrlValue::Bytes(Bytes::from("not a uuid".to_string()));
306        let result = processor.process(&input).unwrap();
307        assert_eq!(
308            result,
309            VrlValue::Bytes(Bytes::from("not a uuid".to_string()))
310        );
311    }
312
313    #[test]
314    fn test_digest_processor_brackets() {
315        let processor = DigestProcessor {
316            fields: Fields::default(),
317            ignore_missing: false,
318            patterns: vec![PresetPattern::Bracketed.regex()],
319        };
320
321        // Basic brackets
322        let input = VrlValue::Bytes(Bytes::from("[content]".to_string()));
323        let result = processor.process(&input).unwrap();
324        assert_eq!(result, VrlValue::Bytes(Bytes::from("".to_string())));
325
326        let input = VrlValue::Bytes(Bytes::from("(content)".to_string()));
327        let result = processor.process(&input).unwrap();
328        assert_eq!(result, VrlValue::Bytes(Bytes::from("".to_string())));
329
330        // Chinese brackets
331        let input = VrlValue::Bytes(Bytes::from("「content」".to_string()));
332        let result = processor.process(&input).unwrap();
333        assert_eq!(result, VrlValue::Bytes(Bytes::from("".to_string())));
334
335        let input = VrlValue::Bytes(Bytes::from("『content』".to_string()));
336        let result = processor.process(&input).unwrap();
337        assert_eq!(result, VrlValue::Bytes(Bytes::from("".to_string())));
338
339        let input = VrlValue::Bytes(Bytes::from("【content】".to_string()));
340        let result = processor.process(&input).unwrap();
341        assert_eq!(result, VrlValue::Bytes(Bytes::from("".to_string())));
342
343        // Unmatched/unclosed brackets should not match
344        let input = VrlValue::Bytes(Bytes::from("[content".to_string()));
345        let result = processor.process(&input).unwrap();
346        assert_eq!(result, VrlValue::Bytes(Bytes::from("[content".to_string())));
347
348        let input = VrlValue::Bytes(Bytes::from("content]".to_string()));
349        let result = processor.process(&input).unwrap();
350        assert_eq!(result, VrlValue::Bytes(Bytes::from("content]".to_string())));
351
352        // Bad case
353        let input = VrlValue::Bytes(Bytes::from("[content}".to_string()));
354        let result = processor.process(&input).unwrap();
355        assert_eq!(result, VrlValue::Bytes(Bytes::from("".to_string())));
356
357        // Negative case
358        let input = VrlValue::Bytes(Bytes::from("no brackets".to_string()));
359        let result = processor.process(&input).unwrap();
360        assert_eq!(
361            result,
362            VrlValue::Bytes(Bytes::from("no brackets".to_string()))
363        );
364    }
365
366    #[test]
367    fn test_digest_processor_quotes() {
368        let processor = DigestProcessor {
369            fields: Fields::default(),
370            ignore_missing: false,
371            patterns: vec![PresetPattern::Quoted.regex()],
372        };
373
374        let input = VrlValue::Bytes(Bytes::from("\"quoted content\"".to_string()));
375        let result = processor.process(&input).unwrap();
376        assert_eq!(result, VrlValue::Bytes(Bytes::from("".to_string())));
377
378        let input = VrlValue::Bytes(Bytes::from("no quotes".to_string()));
379        let result = processor.process(&input).unwrap();
380        assert_eq!(
381            result,
382            VrlValue::Bytes(Bytes::from("no quotes".to_string()))
383        );
384        let input = VrlValue::Bytes(Bytes::from("".to_string()));
385        let result = processor.process(&input).unwrap();
386        assert_eq!(result, VrlValue::Bytes(Bytes::from("".to_string())));
387    }
388
389    #[test]
390    fn test_digest_processor_custom_regex() {
391        let processor = DigestProcessor {
392            fields: Fields::default(),
393            ignore_missing: false,
394            patterns: vec![Regex::new(r"\d+").unwrap()],
395        };
396
397        let input = VrlValue::Bytes(Bytes::from("12345".to_string()));
398        let result = processor.process(&input).unwrap();
399        assert_eq!(result, VrlValue::Bytes(Bytes::from("".to_string())));
400
401        let input = VrlValue::Bytes(Bytes::from("no digits".to_string()));
402        let result = processor.process(&input).unwrap();
403        assert_eq!(
404            result,
405            VrlValue::Bytes(Bytes::from("no digits".to_string()))
406        );
407        let input = VrlValue::Bytes(Bytes::from("".to_string()));
408        let result = processor.process(&input).unwrap();
409        assert_eq!(result, VrlValue::Bytes(Bytes::from("".to_string())));
410    }
411}