pipeline/etl/processor/
urlencoding.rs1use snafu::OptionExt;
16use urlencoding::{decode_binary, encode_binary};
17use vrl::prelude::Bytes;
18use vrl::value::{KeyString, Value as VrlValue};
19
20use crate::error::{
21 Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
22 UrlEncodingInvalidMethodSnafu, ValueMustBeMapSnafu,
23};
24use crate::etl::field::Fields;
25use crate::etl::processor::{
26 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
27 IGNORE_MISSING_NAME, METHOD_NAME,
28};
29
30pub(crate) const PROCESSOR_URL_ENCODING: &str = "urlencoding";
31
32#[derive(Debug, Default)]
33enum Method {
34 #[default]
35 Decode,
36 Encode,
37}
38
39impl std::fmt::Display for Method {
40 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
41 match self {
42 Method::Decode => write!(f, "decode"),
43 Method::Encode => write!(f, "encode"),
44 }
45 }
46}
47
48impl std::str::FromStr for Method {
49 type Err = Error;
50
51 fn from_str(s: &str) -> Result<Self> {
52 match s {
53 "decode" => Ok(Method::Decode),
54 "encode" => Ok(Method::Encode),
55 _ => UrlEncodingInvalidMethodSnafu { s }.fail(),
56 }
57 }
58}
59
60#[derive(Debug, Default)]
62pub struct UrlEncodingProcessor {
63 fields: Fields,
64 method: Method,
65 ignore_missing: bool,
66}
67
68impl UrlEncodingProcessor {
69 fn process_field(&self, val: &Bytes) -> Result<VrlValue> {
70 let processed = match self.method {
71 Method::Encode => Bytes::from_iter(encode_binary(val).bytes()),
72 Method::Decode => Bytes::from(decode_binary(val).to_vec()),
73 };
74 Ok(VrlValue::Bytes(processed))
75 }
76}
77
78impl TryFrom<&yaml_rust::yaml::Hash> for UrlEncodingProcessor {
79 type Error = Error;
80
81 fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
82 let mut fields = Fields::default();
83 let mut method = Method::Decode;
84 let mut ignore_missing = false;
85
86 for (k, v) in value.iter() {
87 let key = k
88 .as_str()
89 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
90 match key {
91 FIELD_NAME => {
92 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
93 }
94 FIELDS_NAME => {
95 fields = yaml_new_fields(v, FIELDS_NAME)?;
96 }
97
98 IGNORE_MISSING_NAME => {
99 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
100 }
101
102 METHOD_NAME => {
103 let method_str = yaml_string(v, METHOD_NAME)?;
104 method = method_str.parse()?;
105 }
106
107 _ => {}
108 }
109 }
110 let processor = UrlEncodingProcessor {
111 fields,
112 method,
113 ignore_missing,
114 };
115
116 Ok(processor)
117 }
118}
119
120impl crate::etl::processor::Processor for UrlEncodingProcessor {
121 fn kind(&self) -> &str {
122 PROCESSOR_URL_ENCODING
123 }
124
125 fn ignore_missing(&self) -> bool {
126 self.ignore_missing
127 }
128
129 fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
130 for field in self.fields.iter() {
131 let index = field.input_field();
132 let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
133 match val.get(index) {
134 Some(VrlValue::Bytes(s)) => {
135 let result = self.process_field(s)?;
136 let output_index = field.target_or_input_field();
137 val.insert(KeyString::from(output_index), result);
138 }
139 Some(VrlValue::Null) | None => {
140 if !self.ignore_missing {
141 return ProcessorMissingFieldSnafu {
142 processor: self.kind(),
143 field: field.input_field(),
144 }
145 .fail();
146 }
147 }
148 Some(v) => {
149 return ProcessorExpectStringSnafu {
150 processor: self.kind(),
151 v: v.clone(),
152 }
153 .fail();
154 }
155 }
156 }
157 Ok(val)
158 }
159}
160
161#[cfg(test)]
162mod tests {
163
164 use vrl::prelude::Bytes;
165 use vrl::value::Value as VrlValue;
166
167 use crate::etl::field::Fields;
168 use crate::etl::processor::urlencoding::UrlEncodingProcessor;
169
170 #[test]
171 fn test_decode_url() {
172 let decoded = "//BC/[a=6.7.8.9,c=g,k=0,l=1]";
173 let encoded = "%2F%2FBC%2F%5Ba%3D6.7.8.9%2Cc%3Dg%2Ck%3D0%2Cl%3D1%5D";
174
175 {
176 let processor = UrlEncodingProcessor::default();
177 let result = processor.process_field(&Bytes::from(encoded)).unwrap();
178 assert_eq!(VrlValue::Bytes(Bytes::from(decoded)), result)
179 }
180 {
181 let processor = UrlEncodingProcessor {
182 fields: Fields::default(),
183 method: super::Method::Encode,
184 ignore_missing: false,
185 };
186 let result = processor.process_field(&Bytes::from(decoded)).unwrap();
187 assert_eq!(VrlValue::Bytes(Bytes::from(encoded)), result)
188 }
189 }
190}