pipeline/etl/processor/
gsub.rs1use regex::Regex;
16use snafu::{OptionExt, ResultExt};
17
18use crate::error::{
19 Error, GsubPatternRequiredSnafu, GsubReplacementRequiredSnafu, KeyMustBeStringSnafu,
20 ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, RegexSnafu, Result,
21};
22use crate::etl::field::Fields;
23use crate::etl::processor::{
24 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
25 IGNORE_MISSING_NAME, PATTERN_NAME,
26};
27use crate::etl::value::Value;
28
29pub(crate) const PROCESSOR_GSUB: &str = "gsub";
30
31const REPLACEMENT_NAME: &str = "replacement";
32
33#[derive(Debug)]
35pub struct GsubProcessor {
36 fields: Fields,
37 pattern: Regex,
38 replacement: String,
39 ignore_missing: bool,
40}
41
42impl GsubProcessor {
43 fn process_string(&self, val: &str) -> Result<Value> {
44 let new_val = self.pattern.replace_all(val, &self.replacement).to_string();
45 let val = Value::String(new_val);
46
47 Ok(val)
48 }
49
50 fn process(&self, val: &Value) -> Result<Value> {
51 match val {
52 Value::String(val) => self.process_string(val),
53 _ => ProcessorExpectStringSnafu {
54 processor: PROCESSOR_GSUB,
55 v: val.clone(),
56 }
57 .fail(),
58 }
59 }
60}
61
62impl TryFrom<&yaml_rust::yaml::Hash> for GsubProcessor {
63 type Error = Error;
64
65 fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
66 let mut fields = Fields::default();
67 let mut ignore_missing = false;
68 let mut pattern = None;
69 let mut replacement = None;
70
71 for (k, v) in value.iter() {
72 let key = k
73 .as_str()
74 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
75
76 match key {
77 FIELD_NAME => {
78 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
79 }
80 FIELDS_NAME => {
81 fields = yaml_new_fields(v, FIELDS_NAME)?;
82 }
83 PATTERN_NAME => {
84 let pattern_str = yaml_string(v, PATTERN_NAME)?;
85 pattern = Some(Regex::new(&pattern_str).context(RegexSnafu {
86 pattern: pattern_str,
87 })?);
88 }
89 REPLACEMENT_NAME => {
90 let replacement_str = yaml_string(v, REPLACEMENT_NAME)?;
91 replacement = Some(replacement_str);
92 }
93
94 IGNORE_MISSING_NAME => {
95 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
96 }
97
98 _ => {}
99 }
100 }
101
102 Ok(GsubProcessor {
103 fields,
104 pattern: pattern.context(GsubPatternRequiredSnafu)?,
105 replacement: replacement.context(GsubReplacementRequiredSnafu)?,
106 ignore_missing,
107 })
108 }
109}
110
111impl crate::etl::processor::Processor for GsubProcessor {
112 fn kind(&self) -> &str {
113 PROCESSOR_GSUB
114 }
115
116 fn ignore_missing(&self) -> bool {
117 self.ignore_missing
118 }
119
120 fn exec_mut(&self, mut val: Value) -> Result<Value> {
121 for field in self.fields.iter() {
122 let index = field.input_field();
123 match val.get(index) {
124 Some(Value::Null) | None => {
125 if !self.ignore_missing {
126 return ProcessorMissingFieldSnafu {
127 processor: self.kind(),
128 field: field.input_field(),
129 }
130 .fail();
131 }
132 }
133 Some(v) => {
134 let result = self.process(v)?;
135 let output_index = field.target_or_input_field();
136 val.insert(output_index.to_string(), result)?;
137 }
138 }
139 }
140 Ok(val)
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147 use crate::etl::processor::gsub::GsubProcessor;
148 use crate::etl::value::Value;
149
150 #[test]
151 fn test_string_value() {
152 let processor = GsubProcessor {
153 fields: Fields::default(),
154 pattern: regex::Regex::new(r"\d+").unwrap(),
155 replacement: "xxx".to_string(),
156 ignore_missing: false,
157 };
158
159 let val = Value::String("123".to_string());
160 let result = processor.process(&val).unwrap();
161
162 assert_eq!(result, Value::String("xxx".to_string()));
163 }
164}