pipeline/etl/processor/
gsub.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use regex::Regex;
16use snafu::{OptionExt, ResultExt};
17use vrl::prelude::Bytes;
18use vrl::value::{KeyString, Value as VrlValue};
19
20use crate::error::{
21    Error, GsubPatternRequiredSnafu, GsubReplacementRequiredSnafu, KeyMustBeStringSnafu,
22    ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, RegexSnafu, Result,
23    ValueMustBeMapSnafu,
24};
25use crate::etl::field::Fields;
26use crate::etl::processor::{
27    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
28    IGNORE_MISSING_NAME, PATTERN_NAME,
29};
30
31pub(crate) const PROCESSOR_GSUB: &str = "gsub";
32
33const REPLACEMENT_NAME: &str = "replacement";
34
35/// A processor to replace all matches of a pattern in string by a replacement, only support string value, and array string value
36#[derive(Debug)]
37pub struct GsubProcessor {
38    fields: Fields,
39    pattern: Regex,
40    replacement: String,
41    ignore_missing: bool,
42}
43
44impl GsubProcessor {
45    fn process_string(&self, val: &str) -> Result<VrlValue> {
46        let new_val = self.pattern.replace_all(val, &self.replacement).to_string();
47        let val = VrlValue::Bytes(Bytes::from(new_val));
48
49        Ok(val)
50    }
51
52    fn process(&self, val: &VrlValue) -> Result<VrlValue> {
53        match val {
54            VrlValue::Bytes(val) => self.process_string(String::from_utf8_lossy(val).as_ref()),
55            _ => ProcessorExpectStringSnafu {
56                processor: PROCESSOR_GSUB,
57                v: val.clone(),
58            }
59            .fail(),
60        }
61    }
62}
63
64impl TryFrom<&yaml_rust::yaml::Hash> for GsubProcessor {
65    type Error = Error;
66
67    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
68        let mut fields = Fields::default();
69        let mut ignore_missing = false;
70        let mut pattern = None;
71        let mut replacement = None;
72
73        for (k, v) in value.iter() {
74            let key = k
75                .as_str()
76                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
77
78            match key {
79                FIELD_NAME => {
80                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
81                }
82                FIELDS_NAME => {
83                    fields = yaml_new_fields(v, FIELDS_NAME)?;
84                }
85                PATTERN_NAME => {
86                    let pattern_str = yaml_string(v, PATTERN_NAME)?;
87                    pattern = Some(Regex::new(&pattern_str).context(RegexSnafu {
88                        pattern: pattern_str,
89                    })?);
90                }
91                REPLACEMENT_NAME => {
92                    let replacement_str = yaml_string(v, REPLACEMENT_NAME)?;
93                    replacement = Some(replacement_str);
94                }
95
96                IGNORE_MISSING_NAME => {
97                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
98                }
99
100                _ => {}
101            }
102        }
103
104        Ok(GsubProcessor {
105            fields,
106            pattern: pattern.context(GsubPatternRequiredSnafu)?,
107            replacement: replacement.context(GsubReplacementRequiredSnafu)?,
108            ignore_missing,
109        })
110    }
111}
112
113impl crate::etl::processor::Processor for GsubProcessor {
114    fn kind(&self) -> &str {
115        PROCESSOR_GSUB
116    }
117
118    fn ignore_missing(&self) -> bool {
119        self.ignore_missing
120    }
121
122    fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
123        for field in self.fields.iter() {
124            let index = field.input_field();
125            let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
126            match val.get(index) {
127                Some(VrlValue::Null) | None => {
128                    if !self.ignore_missing {
129                        return ProcessorMissingFieldSnafu {
130                            processor: self.kind(),
131                            field: field.input_field(),
132                        }
133                        .fail();
134                    }
135                }
136                Some(v) => {
137                    let result = self.process(v)?;
138                    let output_index = field.target_or_input_field();
139                    val.insert(KeyString::from(output_index.to_string()), result);
140                }
141            }
142        }
143        Ok(val)
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150    use crate::etl::processor::gsub::GsubProcessor;
151
152    #[test]
153    fn test_string_value() {
154        let processor = GsubProcessor {
155            fields: Fields::default(),
156            pattern: regex::Regex::new(r"\d+").unwrap(),
157            replacement: "xxx".to_string(),
158            ignore_missing: false,
159        };
160
161        let val = VrlValue::Bytes(Bytes::from("123"));
162        let result = processor.process(&val).unwrap();
163
164        assert_eq!(result, VrlValue::Bytes(Bytes::from("xxx")));
165    }
166}