pipeline/etl/processor/
letter.rs1use snafu::OptionExt;
16use vrl::prelude::Bytes;
17use vrl::value::{KeyString, Value as VrlValue};
18
19use crate::error::{
20 Error, KeyMustBeStringSnafu, LetterInvalidMethodSnafu, ProcessorExpectStringSnafu,
21 ProcessorMissingFieldSnafu, Result, ValueMustBeMapSnafu,
22};
23use crate::etl::field::Fields;
24use crate::etl::processor::{
25 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
26 IGNORE_MISSING_NAME, METHOD_NAME,
27};
28
29pub(crate) const PROCESSOR_LETTER: &str = "letter";
30
31#[derive(Debug, Default)]
32enum Method {
33 Upper,
34 #[default]
35 Lower,
36 Capital,
37}
38
39impl std::fmt::Display for Method {
40 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
41 match self {
42 Method::Upper => write!(f, "upper"),
43 Method::Lower => write!(f, "lower"),
44 Method::Capital => write!(f, "capital"),
45 }
46 }
47}
48
49impl std::str::FromStr for Method {
50 type Err = Error;
51
52 fn from_str(s: &str) -> Result<Self> {
53 match s.to_lowercase().as_str() {
54 "upper" => Ok(Method::Upper),
55 "lower" => Ok(Method::Lower),
56 "capital" => Ok(Method::Capital),
57 _ => LetterInvalidMethodSnafu { method: s }.fail(),
58 }
59 }
60}
61
62#[derive(Debug, Default)]
64pub struct LetterProcessor {
65 fields: Fields,
66 method: Method,
67 ignore_missing: bool,
68}
69
70impl LetterProcessor {
71 fn process_field(&self, val: &Bytes) -> VrlValue {
72 match self.method {
73 Method::Upper => VrlValue::Bytes(Bytes::from(val.to_ascii_uppercase())),
74 Method::Lower => VrlValue::Bytes(Bytes::from(val.to_ascii_lowercase())),
75 Method::Capital => VrlValue::Bytes(Bytes::from(capitalize(
76 String::from_utf8_lossy(val).as_ref(),
77 ))),
78 }
79 }
80}
81
82impl TryFrom<&yaml_rust::yaml::Hash> for LetterProcessor {
83 type Error = Error;
84
85 fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
86 let mut fields = Fields::default();
87 let mut method = Method::Lower;
88 let mut ignore_missing = false;
89
90 for (k, v) in value.iter() {
91 let key = k
92 .as_str()
93 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
94 match key {
95 FIELD_NAME => {
96 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
97 }
98 FIELDS_NAME => {
99 fields = yaml_new_fields(v, FIELDS_NAME)?;
100 }
101 METHOD_NAME => {
102 method = yaml_string(v, METHOD_NAME)?.parse()?;
103 }
104 IGNORE_MISSING_NAME => {
105 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
106 }
107 _ => {}
108 }
109 }
110
111 Ok(LetterProcessor {
112 fields,
113 method,
114 ignore_missing,
115 })
116 }
117}
118
119impl Processor for LetterProcessor {
120 fn kind(&self) -> &str {
121 PROCESSOR_LETTER
122 }
123
124 fn ignore_missing(&self) -> bool {
125 self.ignore_missing
126 }
127
128 fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
129 for field in self.fields.iter() {
130 let index = field.input_field();
131 let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
132 match val.get(index) {
133 Some(VrlValue::Bytes(s)) => {
134 let result = self.process_field(s);
135 let output_key = field.target_or_input_field();
136 val.insert(KeyString::from(output_key), result);
137 }
138 Some(VrlValue::Null) | None => {
139 if !self.ignore_missing {
140 return ProcessorMissingFieldSnafu {
141 processor: self.kind(),
142 field: field.input_field(),
143 }
144 .fail();
145 }
146 }
147 Some(v) => {
148 return ProcessorExpectStringSnafu {
149 processor: self.kind(),
150 v: v.clone(),
151 }
152 .fail();
153 }
154 }
155 }
156
157 Ok(val)
158 }
159}
160
161fn capitalize(s: &str) -> String {
162 let mut c = s.chars();
163 match c.next() {
164 None => String::new(),
165 Some(f) => f.to_uppercase().collect::<String>() + c.as_str(),
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use vrl::prelude::Bytes;
172 use vrl::value::Value as VrlValue;
173
174 use crate::etl::processor::letter::{LetterProcessor, Method};
175
176 #[test]
177 fn test_process() {
178 {
179 let processor = LetterProcessor {
180 method: Method::Upper,
181 ..Default::default()
182 };
183 let processed = processor.process_field(&Bytes::from("pipeline"));
184 assert_eq!(VrlValue::Bytes(Bytes::from("PIPELINE")), processed)
185 }
186
187 {
188 let processor = LetterProcessor {
189 method: Method::Lower,
190 ..Default::default()
191 };
192 let processed = processor.process_field(&Bytes::from("Pipeline"));
193 assert_eq!(VrlValue::Bytes(Bytes::from("pipeline")), processed)
194 }
195
196 {
197 let processor = LetterProcessor {
198 method: Method::Capital,
199 ..Default::default()
200 };
201 let processed = processor.process_field(&Bytes::from("pipeline"));
202 assert_eq!(VrlValue::Bytes(Bytes::from("Pipeline")), processed)
203 }
204 }
205}