pipeline/etl/processor/
urlencoding.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use snafu::{OptionExt, ResultExt};
16use urlencoding::{decode, encode};
17
18use crate::error::{
19    Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
20    UrlEncodingDecodeSnafu, UrlEncodingInvalidMethodSnafu,
21};
22use crate::etl::field::Fields;
23use crate::etl::processor::{
24    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
25    IGNORE_MISSING_NAME, METHOD_NAME,
26};
27use crate::etl::value::Value;
28
29pub(crate) const PROCESSOR_URL_ENCODING: &str = "urlencoding";
30
31#[derive(Debug, Default)]
32enum Method {
33    #[default]
34    Decode,
35    Encode,
36}
37
38impl std::fmt::Display for Method {
39    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
40        match self {
41            Method::Decode => write!(f, "decode"),
42            Method::Encode => write!(f, "encode"),
43        }
44    }
45}
46
47impl std::str::FromStr for Method {
48    type Err = Error;
49
50    fn from_str(s: &str) -> Result<Self> {
51        match s {
52            "decode" => Ok(Method::Decode),
53            "encode" => Ok(Method::Encode),
54            _ => UrlEncodingInvalidMethodSnafu { s }.fail(),
55        }
56    }
57}
58
59/// only support string value
60#[derive(Debug, Default)]
61pub struct UrlEncodingProcessor {
62    fields: Fields,
63    method: Method,
64    ignore_missing: bool,
65}
66
67impl UrlEncodingProcessor {
68    fn process_field(&self, val: &str) -> Result<Value> {
69        let processed = match self.method {
70            Method::Encode => encode(val).to_string(),
71            Method::Decode => decode(val).context(UrlEncodingDecodeSnafu)?.into_owned(),
72        };
73        Ok(Value::String(processed))
74    }
75}
76
77impl TryFrom<&yaml_rust::yaml::Hash> for UrlEncodingProcessor {
78    type Error = Error;
79
80    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
81        let mut fields = Fields::default();
82        let mut method = Method::Decode;
83        let mut ignore_missing = false;
84
85        for (k, v) in value.iter() {
86            let key = k
87                .as_str()
88                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
89            match key {
90                FIELD_NAME => {
91                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
92                }
93                FIELDS_NAME => {
94                    fields = yaml_new_fields(v, FIELDS_NAME)?;
95                }
96
97                IGNORE_MISSING_NAME => {
98                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
99                }
100
101                METHOD_NAME => {
102                    let method_str = yaml_string(v, METHOD_NAME)?;
103                    method = method_str.parse()?;
104                }
105
106                _ => {}
107            }
108        }
109        let processor = UrlEncodingProcessor {
110            fields,
111            method,
112            ignore_missing,
113        };
114
115        Ok(processor)
116    }
117}
118
119impl crate::etl::processor::Processor for UrlEncodingProcessor {
120    fn kind(&self) -> &str {
121        PROCESSOR_URL_ENCODING
122    }
123
124    fn ignore_missing(&self) -> bool {
125        self.ignore_missing
126    }
127
128    fn exec_mut(&self, mut val: Value) -> Result<Value> {
129        for field in self.fields.iter() {
130            let index = field.input_field();
131            match val.get(index) {
132                Some(Value::String(s)) => {
133                    let result = self.process_field(s)?;
134                    let output_index = field.target_or_input_field();
135                    val.insert(output_index.to_string(), result)?;
136                }
137                Some(Value::Null) | None => {
138                    if !self.ignore_missing {
139                        return ProcessorMissingFieldSnafu {
140                            processor: self.kind(),
141                            field: field.input_field(),
142                        }
143                        .fail();
144                    }
145                }
146                Some(v) => {
147                    return ProcessorExpectStringSnafu {
148                        processor: self.kind(),
149                        v: v.clone(),
150                    }
151                    .fail();
152                }
153            }
154        }
155        Ok(val)
156    }
157}
158
159#[cfg(test)]
160mod tests {
161
162    use crate::etl::field::Fields;
163    use crate::etl::processor::urlencoding::UrlEncodingProcessor;
164    use crate::etl::value::Value;
165
166    #[test]
167    fn test_decode_url() {
168        let decoded = "//BC/[a=6.7.8.9,c=g,k=0,l=1]";
169        let encoded = "%2F%2FBC%2F%5Ba%3D6.7.8.9%2Cc%3Dg%2Ck%3D0%2Cl%3D1%5D";
170
171        {
172            let processor = UrlEncodingProcessor::default();
173            let result = processor.process_field(encoded).unwrap();
174            assert_eq!(Value::String(decoded.into()), result)
175        }
176        {
177            let processor = UrlEncodingProcessor {
178                fields: Fields::default(),
179                method: super::Method::Encode,
180                ignore_missing: false,
181            };
182            let result = processor.process_field(decoded).unwrap();
183            assert_eq!(Value::String(encoded.into()), result)
184        }
185    }
186}