pipeline/etl/processor/
digest.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Digest the input string by removing certain patterns.
16//!
17//! This processor can help to extract useful information from a string by removing certain patterns,
18//! which is often a variable from the log message. Digested fields are stored in a new field with the
19//! `_digest` suffix. And can be used for further processing or analysis like template occurrences count
20//! or similarity analysis.
21
22use std::borrow::Cow;
23
24use regex::Regex;
25use snafu::OptionExt;
26
27use crate::error::{
28    DigestPatternInvalidSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
29    ProcessorMissingFieldSnafu, Result,
30};
31use crate::etl::field::Fields;
32use crate::etl::processor::{
33    yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME,
34};
35use crate::etl::value::Value;
36use crate::etl::PipelineMap;
37
38pub(crate) const PROCESSOR_DIGEST: &str = "digest";
39
40const PRESETS_PATTERNS_NAME: &str = "presets";
41const REGEX_PATTERNS_NAME: &str = "regex";
42
43enum PresetPattern {
44    Numbers,
45    Quoted,
46    Bracketed,
47    Uuid,
48    Ip,
49}
50
51impl std::fmt::Display for PresetPattern {
52    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
53        match self {
54            PresetPattern::Numbers => write!(f, "numbers"),
55            PresetPattern::Quoted => write!(f, "quoted"),
56            PresetPattern::Bracketed => write!(f, "bracketed"),
57            PresetPattern::Uuid => write!(f, "uuid"),
58            PresetPattern::Ip => write!(f, "ip"),
59        }
60    }
61}
62
63impl std::str::FromStr for PresetPattern {
64    type Err = Error;
65
66    fn from_str(pattern: &str) -> Result<Self> {
67        match pattern {
68            "numbers" => Ok(PresetPattern::Numbers),
69            "quoted" => Ok(PresetPattern::Quoted),
70            "bracketed" => Ok(PresetPattern::Bracketed),
71            "uuid" => Ok(PresetPattern::Uuid),
72            "ip" => Ok(PresetPattern::Ip),
73            _ => DigestPatternInvalidSnafu { pattern }.fail(),
74        }
75    }
76}
77
78impl PresetPattern {
79    fn regex(&self) -> Regex {
80        match self {
81            PresetPattern::Numbers => Regex::new(r"\d+").unwrap(),
82            PresetPattern::Quoted => Regex::new(r#"["'“”‘’][^"'“”‘’]*["'“”‘’]"#).unwrap(),
83            PresetPattern::Bracketed => Regex::new(r#"[({\[<「『【〔[{〈《][^(){}\[\]<>「」『』【】〔〕[]{}〈〉《》]*[)}\]>」』】〕]}〉》]"#).unwrap(),
84            PresetPattern::Uuid => Regex::new(r"\b[0-9a-fA-F]{8}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{12}\b").unwrap(),
85            PresetPattern::Ip => Regex::new(r"((\d{1,3}\.){3}\d{1,3}(:\d+)?|(\[[0-9a-fA-F:]+\])(:\d+)?)").unwrap(),
86        }
87    }
88}
89
90/// Computes a digest (hash) of the input string.
91#[derive(Debug, Default)]
92pub struct DigestProcessor {
93    fields: Fields,
94    ignore_missing: bool,
95    patterns: Vec<Regex>,
96}
97
98impl DigestProcessor {
99    fn remove_quoted_content(&self, val: &str) -> String {
100        let re = Regex::new(r#""[^"]*""#).unwrap();
101        re.replace_all(val, "").to_string()
102    }
103
104    fn process_string(&self, val: &str) -> Result<Value> {
105        let mut input = Cow::from(val);
106        for pattern in &self.patterns {
107            if let Cow::Owned(new_string) = pattern.replace_all(&input, "") {
108                input = Cow::Owned(new_string);
109            }
110        }
111
112        Ok(Value::String(input.into_owned()))
113    }
114
115    fn process(&self, val: &Value) -> Result<Value> {
116        match val {
117            Value::String(val) => self.process_string(val),
118            _ => ProcessorExpectStringSnafu {
119                processor: PROCESSOR_DIGEST,
120                v: val.clone(),
121            }
122            .fail(),
123        }
124    }
125}
126
127impl TryFrom<&yaml_rust::yaml::Hash> for DigestProcessor {
128    type Error = Error;
129
130    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
131        let mut fields = Fields::default();
132        let mut ignore_missing = false;
133        let mut patterns = Vec::new();
134
135        for (k, v) in value.iter() {
136            let key = k
137                .as_str()
138                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
139
140            match key {
141                FIELD_NAME => {
142                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
143                }
144                FIELDS_NAME => {
145                    fields = yaml_new_fields(v, FIELDS_NAME)?;
146                }
147                IGNORE_MISSING_NAME => {
148                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
149                }
150                PRESETS_PATTERNS_NAME => {
151                    let preset_patterns: Vec<String> = v
152                        .as_vec()
153                        .with_context(|| DigestPatternInvalidSnafu {
154                            pattern: key.to_string(),
155                        })?
156                        .iter()
157                        .map(|p| p.as_str().unwrap().to_string())
158                        .collect();
159                    for pattern in preset_patterns {
160                        let preset_pattern = pattern.parse::<PresetPattern>()?;
161                        let regex = preset_pattern.regex();
162                        patterns.push(regex);
163                    }
164                }
165                REGEX_PATTERNS_NAME => {
166                    let regex_patterns: Vec<String> = v
167                        .as_vec()
168                        .with_context(|| DigestPatternInvalidSnafu {
169                            pattern: key.to_string(),
170                        })?
171                        .iter()
172                        .map(|p| p.as_str().unwrap().to_string())
173                        .collect();
174                    for pattern in regex_patterns {
175                        let regex = Regex::new(&pattern).unwrap();
176                        patterns.push(regex);
177                    }
178                }
179                _ => {}
180            }
181        }
182
183        for field in fields.iter_mut() {
184            field.set_target_field(Some(format!("{}_digest", field.input_field())));
185        }
186
187        Ok(DigestProcessor {
188            fields,
189            patterns,
190            ignore_missing,
191        })
192    }
193}
194
195impl crate::etl::processor::Processor for DigestProcessor {
196    fn kind(&self) -> &str {
197        PROCESSOR_DIGEST
198    }
199
200    fn ignore_missing(&self) -> bool {
201        self.ignore_missing
202    }
203
204    fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
205        for field in self.fields.iter() {
206            let index = field.input_field();
207            match val.get(index) {
208                Some(Value::Null) | None => {
209                    if !self.ignore_missing {
210                        return ProcessorMissingFieldSnafu {
211                            processor: self.kind(),
212                            field: field.input_field(),
213                        }
214                        .fail();
215                    }
216                }
217                Some(v) => {
218                    let result = self.process(v)?;
219                    let output_index = field.target_or_input_field();
220                    val.insert(output_index.to_string(), result);
221                }
222            }
223        }
224        Ok(val)
225    }
226}
227
228#[cfg(test)]
229mod tests {
230
231    use super::*;
232
233    #[test]
234    fn test_digest_processor_ip() {
235        let processor = DigestProcessor {
236            fields: Fields::default(),
237            ignore_missing: false,
238            patterns: vec![PresetPattern::Ip.regex()],
239        };
240
241        let input = Value::String("192.168.1.1".to_string());
242        let result = processor.process(&input).unwrap();
243        assert_eq!(result, Value::String("".to_string()));
244        let input = Value::String("192.168.1.1:8080".to_string());
245        let result = processor.process(&input).unwrap();
246        assert_eq!(result, Value::String("".to_string()));
247
248        let input = Value::String("[2001:0db8:85a3:0000:0000:8a2e:0370:7334]".to_string());
249        let result = processor.process(&input).unwrap();
250        assert_eq!(result, Value::String("".to_string()));
251
252        let input = Value::String("[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:8080".to_string());
253        let result = processor.process(&input).unwrap();
254        assert_eq!(result, Value::String("".to_string()));
255
256        let input = Value::String("not an ip".to_string());
257        let result = processor.process(&input).unwrap();
258        assert_eq!(result, Value::String("not an ip".to_string()));
259    }
260
261    #[test]
262    fn test_digest_processor_uuid() {
263        let processor = DigestProcessor {
264            fields: Fields::default(),
265            ignore_missing: false,
266            patterns: vec![PresetPattern::Uuid.regex()],
267        };
268        // UUID v4
269        let input = Value::String("123e4567-e89b-12d3-a456-426614174000".to_string());
270        let result = processor.process(&input).unwrap();
271        assert_eq!(result, Value::String("".to_string()));
272
273        // UUID v1
274        let input = Value::String("6ba7b810-9dad-11d1-80b4-00c04fd430c8".to_string());
275        let result = processor.process(&input).unwrap();
276        assert_eq!(result, Value::String("".to_string()));
277
278        // UUID v5
279        let input = Value::String("886313e1-3b8a-5372-9b90-0c9aee199e5d".to_string());
280        let result = processor.process(&input).unwrap();
281        assert_eq!(result, Value::String("".to_string()));
282
283        // UUID with uppercase letters
284        let input = Value::String("A987FBC9-4BED-3078-CF07-9141BA07C9F3".to_string());
285        let result = processor.process(&input).unwrap();
286        assert_eq!(result, Value::String("".to_string()));
287
288        // Negative case
289        let input = Value::String("not a uuid".to_string());
290        let result = processor.process(&input).unwrap();
291        assert_eq!(result, Value::String("not a uuid".to_string()));
292    }
293
294    #[test]
295    fn test_digest_processor_brackets() {
296        let processor = DigestProcessor {
297            fields: Fields::default(),
298            ignore_missing: false,
299            patterns: vec![PresetPattern::Bracketed.regex()],
300        };
301
302        // Basic brackets
303        let input = Value::String("[content]".to_string());
304        let result = processor.process(&input).unwrap();
305        assert_eq!(result, Value::String("".to_string()));
306
307        let input = Value::String("(content)".to_string());
308        let result = processor.process(&input).unwrap();
309        assert_eq!(result, Value::String("".to_string()));
310
311        // Chinese brackets
312        let input = Value::String("「content」".to_string());
313        let result = processor.process(&input).unwrap();
314        assert_eq!(result, Value::String("".to_string()));
315
316        let input = Value::String("『content』".to_string());
317        let result = processor.process(&input).unwrap();
318        assert_eq!(result, Value::String("".to_string()));
319
320        let input = Value::String("【content】".to_string());
321        let result = processor.process(&input).unwrap();
322        assert_eq!(result, Value::String("".to_string()));
323
324        // Unmatched/unclosed brackets should not match
325        let input = Value::String("[content".to_string());
326        let result = processor.process(&input).unwrap();
327        assert_eq!(result, Value::String("[content".to_string()));
328
329        let input = Value::String("content]".to_string());
330        let result = processor.process(&input).unwrap();
331        assert_eq!(result, Value::String("content]".to_string()));
332
333        // Bad case
334        let input = Value::String("[content}".to_string());
335        let result = processor.process(&input).unwrap();
336        assert_eq!(result, Value::String("".to_string()));
337
338        // Negative case
339        let input = Value::String("no brackets".to_string());
340        let result = processor.process(&input).unwrap();
341        assert_eq!(result, Value::String("no brackets".to_string()));
342    }
343
344    #[test]
345    fn test_digest_processor_quotes() {
346        let processor = DigestProcessor {
347            fields: Fields::default(),
348            ignore_missing: false,
349            patterns: vec![PresetPattern::Quoted.regex()],
350        };
351
352        let input = Value::String("\"quoted content\"".to_string());
353        let result = processor.process(&input).unwrap();
354        assert_eq!(result, Value::String("".to_string()));
355
356        let input = Value::String("no quotes".to_string());
357        let result = processor.process(&input).unwrap();
358        assert_eq!(result, Value::String("no quotes".to_string()));
359        let input = Value::String("".to_string());
360        let result = processor.process(&input).unwrap();
361        assert_eq!(result, Value::String("".to_string()));
362    }
363
364    #[test]
365    fn test_digest_processor_custom_regex() {
366        let processor = DigestProcessor {
367            fields: Fields::default(),
368            ignore_missing: false,
369            patterns: vec![Regex::new(r"\d+").unwrap()],
370        };
371
372        let input = Value::String("12345".to_string());
373        let result = processor.process(&input).unwrap();
374        assert_eq!(result, Value::String("".to_string()));
375
376        let input = Value::String("no digits".to_string());
377        let result = processor.process(&input).unwrap();
378        assert_eq!(result, Value::String("no digits".to_string()));
379        let input = Value::String("".to_string());
380        let result = processor.process(&input).unwrap();
381        assert_eq!(result, Value::String("".to_string()));
382    }
383}