pipeline/etl/processor/
urlencoding.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use snafu::OptionExt;
16use urlencoding::{decode_binary, encode_binary};
17use vrl::prelude::Bytes;
18use vrl::value::{KeyString, Value as VrlValue};
19
20use crate::error::{
21    Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
22    UrlEncodingInvalidMethodSnafu, ValueMustBeMapSnafu,
23};
24use crate::etl::field::Fields;
25use crate::etl::processor::{
26    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
27    IGNORE_MISSING_NAME, METHOD_NAME,
28};
29
30pub(crate) const PROCESSOR_URL_ENCODING: &str = "urlencoding";
31
32#[derive(Debug, Default)]
33enum Method {
34    #[default]
35    Decode,
36    Encode,
37}
38
39impl std::fmt::Display for Method {
40    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
41        match self {
42            Method::Decode => write!(f, "decode"),
43            Method::Encode => write!(f, "encode"),
44        }
45    }
46}
47
48impl std::str::FromStr for Method {
49    type Err = Error;
50
51    fn from_str(s: &str) -> Result<Self> {
52        match s {
53            "decode" => Ok(Method::Decode),
54            "encode" => Ok(Method::Encode),
55            _ => UrlEncodingInvalidMethodSnafu { s }.fail(),
56        }
57    }
58}
59
60/// only support string value
61#[derive(Debug, Default)]
62pub struct UrlEncodingProcessor {
63    fields: Fields,
64    method: Method,
65    ignore_missing: bool,
66}
67
68impl UrlEncodingProcessor {
69    fn process_field(&self, val: &Bytes) -> Result<VrlValue> {
70        let processed = match self.method {
71            Method::Encode => Bytes::from_iter(encode_binary(val).bytes()),
72            Method::Decode => Bytes::from(decode_binary(val).to_vec()),
73        };
74        Ok(VrlValue::Bytes(processed))
75    }
76}
77
78impl TryFrom<&yaml_rust::yaml::Hash> for UrlEncodingProcessor {
79    type Error = Error;
80
81    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
82        let mut fields = Fields::default();
83        let mut method = Method::Decode;
84        let mut ignore_missing = false;
85
86        for (k, v) in value.iter() {
87            let key = k
88                .as_str()
89                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
90            match key {
91                FIELD_NAME => {
92                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
93                }
94                FIELDS_NAME => {
95                    fields = yaml_new_fields(v, FIELDS_NAME)?;
96                }
97
98                IGNORE_MISSING_NAME => {
99                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
100                }
101
102                METHOD_NAME => {
103                    let method_str = yaml_string(v, METHOD_NAME)?;
104                    method = method_str.parse()?;
105                }
106
107                _ => {}
108            }
109        }
110        let processor = UrlEncodingProcessor {
111            fields,
112            method,
113            ignore_missing,
114        };
115
116        Ok(processor)
117    }
118}
119
120impl crate::etl::processor::Processor for UrlEncodingProcessor {
121    fn kind(&self) -> &str {
122        PROCESSOR_URL_ENCODING
123    }
124
125    fn ignore_missing(&self) -> bool {
126        self.ignore_missing
127    }
128
129    fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
130        for field in self.fields.iter() {
131            let index = field.input_field();
132            let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
133            match val.get(index) {
134                Some(VrlValue::Bytes(s)) => {
135                    let result = self.process_field(s)?;
136                    let output_index = field.target_or_input_field();
137                    val.insert(KeyString::from(output_index), result);
138                }
139                Some(VrlValue::Null) | None => {
140                    if !self.ignore_missing {
141                        return ProcessorMissingFieldSnafu {
142                            processor: self.kind(),
143                            field: field.input_field(),
144                        }
145                        .fail();
146                    }
147                }
148                Some(v) => {
149                    return ProcessorExpectStringSnafu {
150                        processor: self.kind(),
151                        v: v.clone(),
152                    }
153                    .fail();
154                }
155            }
156        }
157        Ok(val)
158    }
159}
160
161#[cfg(test)]
162mod tests {
163
164    use vrl::prelude::Bytes;
165    use vrl::value::Value as VrlValue;
166
167    use crate::etl::field::Fields;
168    use crate::etl::processor::urlencoding::UrlEncodingProcessor;
169
170    #[test]
171    fn test_decode_url() {
172        let decoded = "//BC/[a=6.7.8.9,c=g,k=0,l=1]";
173        let encoded = "%2F%2FBC%2F%5Ba%3D6.7.8.9%2Cc%3Dg%2Ck%3D0%2Cl%3D1%5D";
174
175        {
176            let processor = UrlEncodingProcessor::default();
177            let result = processor.process_field(&Bytes::from(encoded)).unwrap();
178            assert_eq!(VrlValue::Bytes(Bytes::from(decoded)), result)
179        }
180        {
181            let processor = UrlEncodingProcessor {
182                fields: Fields::default(),
183                method: super::Method::Encode,
184                ignore_missing: false,
185            };
186            let result = processor.process_field(&Bytes::from(decoded)).unwrap();
187            assert_eq!(VrlValue::Bytes(Bytes::from(encoded)), result)
188        }
189    }
190}