pipeline/etl/processor/
urlencoding.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use snafu::{OptionExt, ResultExt};
16use urlencoding::{decode, encode};
17
18use crate::error::{
19    Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
20    UrlEncodingDecodeSnafu, UrlEncodingInvalidMethodSnafu,
21};
22use crate::etl::field::Fields;
23use crate::etl::processor::{
24    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
25    IGNORE_MISSING_NAME, METHOD_NAME,
26};
27use crate::etl::value::Value;
28use crate::PipelineMap;
29
30pub(crate) const PROCESSOR_URL_ENCODING: &str = "urlencoding";
31
32#[derive(Debug, Default)]
33enum Method {
34    #[default]
35    Decode,
36    Encode,
37}
38
39impl std::fmt::Display for Method {
40    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
41        match self {
42            Method::Decode => write!(f, "decode"),
43            Method::Encode => write!(f, "encode"),
44        }
45    }
46}
47
48impl std::str::FromStr for Method {
49    type Err = Error;
50
51    fn from_str(s: &str) -> Result<Self> {
52        match s {
53            "decode" => Ok(Method::Decode),
54            "encode" => Ok(Method::Encode),
55            _ => UrlEncodingInvalidMethodSnafu { s }.fail(),
56        }
57    }
58}
59
60/// only support string value
61#[derive(Debug, Default)]
62pub struct UrlEncodingProcessor {
63    fields: Fields,
64    method: Method,
65    ignore_missing: bool,
66}
67
68impl UrlEncodingProcessor {
69    fn process_field(&self, val: &str) -> Result<Value> {
70        let processed = match self.method {
71            Method::Encode => encode(val).to_string(),
72            Method::Decode => decode(val).context(UrlEncodingDecodeSnafu)?.into_owned(),
73        };
74        Ok(Value::String(processed))
75    }
76}
77
78impl TryFrom<&yaml_rust::yaml::Hash> for UrlEncodingProcessor {
79    type Error = Error;
80
81    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
82        let mut fields = Fields::default();
83        let mut method = Method::Decode;
84        let mut ignore_missing = false;
85
86        for (k, v) in value.iter() {
87            let key = k
88                .as_str()
89                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
90            match key {
91                FIELD_NAME => {
92                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
93                }
94                FIELDS_NAME => {
95                    fields = yaml_new_fields(v, FIELDS_NAME)?;
96                }
97
98                IGNORE_MISSING_NAME => {
99                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
100                }
101
102                METHOD_NAME => {
103                    let method_str = yaml_string(v, METHOD_NAME)?;
104                    method = method_str.parse()?;
105                }
106
107                _ => {}
108            }
109        }
110        let processor = UrlEncodingProcessor {
111            fields,
112            method,
113            ignore_missing,
114        };
115
116        Ok(processor)
117    }
118}
119
120impl crate::etl::processor::Processor for UrlEncodingProcessor {
121    fn kind(&self) -> &str {
122        PROCESSOR_URL_ENCODING
123    }
124
125    fn ignore_missing(&self) -> bool {
126        self.ignore_missing
127    }
128
129    fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
130        for field in self.fields.iter() {
131            let index = field.input_field();
132            match val.get(index) {
133                Some(Value::String(s)) => {
134                    let result = self.process_field(s)?;
135                    let output_index = field.target_or_input_field();
136                    val.insert(output_index.to_string(), result);
137                }
138                Some(Value::Null) | None => {
139                    if !self.ignore_missing {
140                        return ProcessorMissingFieldSnafu {
141                            processor: self.kind(),
142                            field: field.input_field(),
143                        }
144                        .fail();
145                    }
146                }
147                Some(v) => {
148                    return ProcessorExpectStringSnafu {
149                        processor: self.kind(),
150                        v: v.clone(),
151                    }
152                    .fail();
153                }
154            }
155        }
156        Ok(())
157    }
158}
159
160#[cfg(test)]
161mod tests {
162
163    use crate::etl::field::Fields;
164    use crate::etl::processor::urlencoding::UrlEncodingProcessor;
165    use crate::etl::value::Value;
166
167    #[test]
168    fn test_decode_url() {
169        let decoded = "//BC/[a=6.7.8.9,c=g,k=0,l=1]";
170        let encoded = "%2F%2FBC%2F%5Ba%3D6.7.8.9%2Cc%3Dg%2Ck%3D0%2Cl%3D1%5D";
171
172        {
173            let processor = UrlEncodingProcessor::default();
174            let result = processor.process_field(encoded).unwrap();
175            assert_eq!(Value::String(decoded.into()), result)
176        }
177        {
178            let processor = UrlEncodingProcessor {
179                fields: Fields::default(),
180                method: super::Method::Encode,
181                ignore_missing: false,
182            };
183            let result = processor.process_field(decoded).unwrap();
184            assert_eq!(Value::String(encoded.into()), result)
185        }
186    }
187}