pipeline/etl/processor/
gsub.rs1use regex::Regex;
16use snafu::{OptionExt, ResultExt};
17
18use crate::error::{
19 Error, GsubPatternRequiredSnafu, GsubReplacementRequiredSnafu, KeyMustBeStringSnafu,
20 ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, RegexSnafu, Result,
21};
22use crate::etl::field::Fields;
23use crate::etl::processor::{
24 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
25 IGNORE_MISSING_NAME, PATTERN_NAME,
26};
27use crate::etl::value::Value;
28use crate::etl::PipelineMap;
29
30pub(crate) const PROCESSOR_GSUB: &str = "gsub";
31
32const REPLACEMENT_NAME: &str = "replacement";
33
34#[derive(Debug)]
36pub struct GsubProcessor {
37 fields: Fields,
38 pattern: Regex,
39 replacement: String,
40 ignore_missing: bool,
41}
42
43impl GsubProcessor {
44 fn process_string(&self, val: &str) -> Result<Value> {
45 let new_val = self.pattern.replace_all(val, &self.replacement).to_string();
46 let val = Value::String(new_val);
47
48 Ok(val)
49 }
50
51 fn process(&self, val: &Value) -> Result<Value> {
52 match val {
53 Value::String(val) => self.process_string(val),
54 _ => ProcessorExpectStringSnafu {
55 processor: PROCESSOR_GSUB,
56 v: val.clone(),
57 }
58 .fail(),
59 }
60 }
61}
62
63impl TryFrom<&yaml_rust::yaml::Hash> for GsubProcessor {
64 type Error = Error;
65
66 fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
67 let mut fields = Fields::default();
68 let mut ignore_missing = false;
69 let mut pattern = None;
70 let mut replacement = None;
71
72 for (k, v) in value.iter() {
73 let key = k
74 .as_str()
75 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
76
77 match key {
78 FIELD_NAME => {
79 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
80 }
81 FIELDS_NAME => {
82 fields = yaml_new_fields(v, FIELDS_NAME)?;
83 }
84 PATTERN_NAME => {
85 let pattern_str = yaml_string(v, PATTERN_NAME)?;
86 pattern = Some(Regex::new(&pattern_str).context(RegexSnafu {
87 pattern: pattern_str,
88 })?);
89 }
90 REPLACEMENT_NAME => {
91 let replacement_str = yaml_string(v, REPLACEMENT_NAME)?;
92 replacement = Some(replacement_str);
93 }
94
95 IGNORE_MISSING_NAME => {
96 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
97 }
98
99 _ => {}
100 }
101 }
102
103 Ok(GsubProcessor {
104 fields,
105 pattern: pattern.context(GsubPatternRequiredSnafu)?,
106 replacement: replacement.context(GsubReplacementRequiredSnafu)?,
107 ignore_missing,
108 })
109 }
110}
111
112impl crate::etl::processor::Processor for GsubProcessor {
113 fn kind(&self) -> &str {
114 PROCESSOR_GSUB
115 }
116
117 fn ignore_missing(&self) -> bool {
118 self.ignore_missing
119 }
120
121 fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
122 for field in self.fields.iter() {
123 let index = field.input_field();
124 match val.get(index) {
125 Some(Value::Null) | None => {
126 if !self.ignore_missing {
127 return ProcessorMissingFieldSnafu {
128 processor: self.kind(),
129 field: field.input_field(),
130 }
131 .fail();
132 }
133 }
134 Some(v) => {
135 let result = self.process(v)?;
136 let output_index = field.target_or_input_field();
137 val.insert(output_index.to_string(), result);
138 }
139 }
140 }
141 Ok(())
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148 use crate::etl::processor::gsub::GsubProcessor;
149 use crate::etl::value::Value;
150
151 #[test]
152 fn test_string_value() {
153 let processor = GsubProcessor {
154 fields: Fields::default(),
155 pattern: regex::Regex::new(r"\d+").unwrap(),
156 replacement: "xxx".to_string(),
157 ignore_missing: false,
158 };
159
160 let val = Value::String("123".to_string());
161 let result = processor.process(&val).unwrap();
162
163 assert_eq!(result, Value::String("xxx".to_string()));
164 }
165}