pipeline/etl/processor/
gsub.rs1use regex::Regex;
16use snafu::{OptionExt, ResultExt};
17use vrl::prelude::Bytes;
18use vrl::value::{KeyString, Value as VrlValue};
19
20use crate::error::{
21 Error, GsubPatternRequiredSnafu, GsubReplacementRequiredSnafu, KeyMustBeStringSnafu,
22 ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, RegexSnafu, Result,
23 ValueMustBeMapSnafu,
24};
25use crate::etl::field::Fields;
26use crate::etl::processor::{
27 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
28 IGNORE_MISSING_NAME, PATTERN_NAME,
29};
30
31pub(crate) const PROCESSOR_GSUB: &str = "gsub";
32
33const REPLACEMENT_NAME: &str = "replacement";
34
35#[derive(Debug)]
37pub struct GsubProcessor {
38 fields: Fields,
39 pattern: Regex,
40 replacement: String,
41 ignore_missing: bool,
42}
43
44impl GsubProcessor {
45 fn process_string(&self, val: &str) -> Result<VrlValue> {
46 let new_val = self.pattern.replace_all(val, &self.replacement).to_string();
47 let val = VrlValue::Bytes(Bytes::from(new_val));
48
49 Ok(val)
50 }
51
52 fn process(&self, val: &VrlValue) -> Result<VrlValue> {
53 match val {
54 VrlValue::Bytes(val) => self.process_string(String::from_utf8_lossy(val).as_ref()),
55 _ => ProcessorExpectStringSnafu {
56 processor: PROCESSOR_GSUB,
57 v: val.clone(),
58 }
59 .fail(),
60 }
61 }
62}
63
64impl TryFrom<&yaml_rust::yaml::Hash> for GsubProcessor {
65 type Error = Error;
66
67 fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
68 let mut fields = Fields::default();
69 let mut ignore_missing = false;
70 let mut pattern = None;
71 let mut replacement = None;
72
73 for (k, v) in value.iter() {
74 let key = k
75 .as_str()
76 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
77
78 match key {
79 FIELD_NAME => {
80 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
81 }
82 FIELDS_NAME => {
83 fields = yaml_new_fields(v, FIELDS_NAME)?;
84 }
85 PATTERN_NAME => {
86 let pattern_str = yaml_string(v, PATTERN_NAME)?;
87 pattern = Some(Regex::new(&pattern_str).context(RegexSnafu {
88 pattern: pattern_str,
89 })?);
90 }
91 REPLACEMENT_NAME => {
92 let replacement_str = yaml_string(v, REPLACEMENT_NAME)?;
93 replacement = Some(replacement_str);
94 }
95
96 IGNORE_MISSING_NAME => {
97 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
98 }
99
100 _ => {}
101 }
102 }
103
104 Ok(GsubProcessor {
105 fields,
106 pattern: pattern.context(GsubPatternRequiredSnafu)?,
107 replacement: replacement.context(GsubReplacementRequiredSnafu)?,
108 ignore_missing,
109 })
110 }
111}
112
113impl crate::etl::processor::Processor for GsubProcessor {
114 fn kind(&self) -> &str {
115 PROCESSOR_GSUB
116 }
117
118 fn ignore_missing(&self) -> bool {
119 self.ignore_missing
120 }
121
122 fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
123 for field in self.fields.iter() {
124 let index = field.input_field();
125 let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
126 match val.get(index) {
127 Some(VrlValue::Null) | None => {
128 if !self.ignore_missing {
129 return ProcessorMissingFieldSnafu {
130 processor: self.kind(),
131 field: field.input_field(),
132 }
133 .fail();
134 }
135 }
136 Some(v) => {
137 let result = self.process(v)?;
138 let output_index = field.target_or_input_field();
139 val.insert(KeyString::from(output_index.to_string()), result);
140 }
141 }
142 }
143 Ok(val)
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150 use crate::etl::processor::gsub::GsubProcessor;
151
152 #[test]
153 fn test_string_value() {
154 let processor = GsubProcessor {
155 fields: Fields::default(),
156 pattern: regex::Regex::new(r"\d+").unwrap(),
157 replacement: "xxx".to_string(),
158 ignore_missing: false,
159 };
160
161 let val = VrlValue::Bytes(Bytes::from("123"));
162 let result = processor.process(&val).unwrap();
163
164 assert_eq!(result, VrlValue::Bytes(Bytes::from("xxx")));
165 }
166}