pipeline/etl/processor/
letter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use snafu::OptionExt;
16use vrl::prelude::Bytes;
17use vrl::value::{KeyString, Value as VrlValue};
18
19use crate::error::{
20    Error, KeyMustBeStringSnafu, LetterInvalidMethodSnafu, ProcessorExpectStringSnafu,
21    ProcessorMissingFieldSnafu, Result, ValueMustBeMapSnafu,
22};
23use crate::etl::field::Fields;
24use crate::etl::processor::{
25    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
26    IGNORE_MISSING_NAME, METHOD_NAME,
27};
28
29pub(crate) const PROCESSOR_LETTER: &str = "letter";
30
31#[derive(Debug, Default)]
32enum Method {
33    Upper,
34    #[default]
35    Lower,
36    Capital,
37}
38
39impl std::fmt::Display for Method {
40    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
41        match self {
42            Method::Upper => write!(f, "upper"),
43            Method::Lower => write!(f, "lower"),
44            Method::Capital => write!(f, "capital"),
45        }
46    }
47}
48
49impl std::str::FromStr for Method {
50    type Err = Error;
51
52    fn from_str(s: &str) -> Result<Self> {
53        match s.to_lowercase().as_str() {
54            "upper" => Ok(Method::Upper),
55            "lower" => Ok(Method::Lower),
56            "capital" => Ok(Method::Capital),
57            _ => LetterInvalidMethodSnafu { method: s }.fail(),
58        }
59    }
60}
61
62/// only support string value
63#[derive(Debug, Default)]
64pub struct LetterProcessor {
65    fields: Fields,
66    method: Method,
67    ignore_missing: bool,
68}
69
70impl LetterProcessor {
71    fn process_field(&self, val: &Bytes) -> VrlValue {
72        match self.method {
73            Method::Upper => VrlValue::Bytes(Bytes::from(val.to_ascii_uppercase())),
74            Method::Lower => VrlValue::Bytes(Bytes::from(val.to_ascii_lowercase())),
75            Method::Capital => VrlValue::Bytes(Bytes::from(capitalize(
76                String::from_utf8_lossy(val).as_ref(),
77            ))),
78        }
79    }
80}
81
82impl TryFrom<&yaml_rust::yaml::Hash> for LetterProcessor {
83    type Error = Error;
84
85    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
86        let mut fields = Fields::default();
87        let mut method = Method::Lower;
88        let mut ignore_missing = false;
89
90        for (k, v) in value.iter() {
91            let key = k
92                .as_str()
93                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
94            match key {
95                FIELD_NAME => {
96                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
97                }
98                FIELDS_NAME => {
99                    fields = yaml_new_fields(v, FIELDS_NAME)?;
100                }
101                METHOD_NAME => {
102                    method = yaml_string(v, METHOD_NAME)?.parse()?;
103                }
104                IGNORE_MISSING_NAME => {
105                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
106                }
107                _ => {}
108            }
109        }
110
111        Ok(LetterProcessor {
112            fields,
113            method,
114            ignore_missing,
115        })
116    }
117}
118
119impl Processor for LetterProcessor {
120    fn kind(&self) -> &str {
121        PROCESSOR_LETTER
122    }
123
124    fn ignore_missing(&self) -> bool {
125        self.ignore_missing
126    }
127
128    fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
129        for field in self.fields.iter() {
130            let index = field.input_field();
131            let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
132            match val.get(index) {
133                Some(VrlValue::Bytes(s)) => {
134                    let result = self.process_field(s);
135                    let output_key = field.target_or_input_field();
136                    val.insert(KeyString::from(output_key), result);
137                }
138                Some(VrlValue::Null) | None => {
139                    if !self.ignore_missing {
140                        return ProcessorMissingFieldSnafu {
141                            processor: self.kind(),
142                            field: field.input_field(),
143                        }
144                        .fail();
145                    }
146                }
147                Some(v) => {
148                    return ProcessorExpectStringSnafu {
149                        processor: self.kind(),
150                        v: v.clone(),
151                    }
152                    .fail();
153                }
154            }
155        }
156
157        Ok(val)
158    }
159}
160
161fn capitalize(s: &str) -> String {
162    let mut c = s.chars();
163    match c.next() {
164        None => String::new(),
165        Some(f) => f.to_uppercase().collect::<String>() + c.as_str(),
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use vrl::prelude::Bytes;
172    use vrl::value::Value as VrlValue;
173
174    use crate::etl::processor::letter::{LetterProcessor, Method};
175
176    #[test]
177    fn test_process() {
178        {
179            let processor = LetterProcessor {
180                method: Method::Upper,
181                ..Default::default()
182            };
183            let processed = processor.process_field(&Bytes::from("pipeline"));
184            assert_eq!(VrlValue::Bytes(Bytes::from("PIPELINE")), processed)
185        }
186
187        {
188            let processor = LetterProcessor {
189                method: Method::Lower,
190                ..Default::default()
191            };
192            let processed = processor.process_field(&Bytes::from("Pipeline"));
193            assert_eq!(VrlValue::Bytes(Bytes::from("pipeline")), processed)
194        }
195
196        {
197            let processor = LetterProcessor {
198                method: Method::Capital,
199                ..Default::default()
200            };
201            let processed = processor.process_field(&Bytes::from("pipeline"));
202            assert_eq!(VrlValue::Bytes(Bytes::from("Pipeline")), processed)
203        }
204    }
205}