pipeline/etl/processor/
letter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use snafu::OptionExt;
16
17use crate::error::{
18    Error, KeyMustBeStringSnafu, LetterInvalidMethodSnafu, ProcessorExpectStringSnafu,
19    ProcessorMissingFieldSnafu, Result,
20};
21use crate::etl::field::Fields;
22use crate::etl::processor::{
23    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
24    IGNORE_MISSING_NAME, METHOD_NAME,
25};
26use crate::etl::value::Value;
27
28pub(crate) const PROCESSOR_LETTER: &str = "letter";
29
30#[derive(Debug, Default)]
31enum Method {
32    Upper,
33    #[default]
34    Lower,
35    Capital,
36}
37
38impl std::fmt::Display for Method {
39    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
40        match self {
41            Method::Upper => write!(f, "upper"),
42            Method::Lower => write!(f, "lower"),
43            Method::Capital => write!(f, "capital"),
44        }
45    }
46}
47
48impl std::str::FromStr for Method {
49    type Err = Error;
50
51    fn from_str(s: &str) -> Result<Self> {
52        match s.to_lowercase().as_str() {
53            "upper" => Ok(Method::Upper),
54            "lower" => Ok(Method::Lower),
55            "capital" => Ok(Method::Capital),
56            _ => LetterInvalidMethodSnafu { method: s }.fail(),
57        }
58    }
59}
60
61/// only support string value
62#[derive(Debug, Default)]
63pub struct LetterProcessor {
64    fields: Fields,
65    method: Method,
66    ignore_missing: bool,
67}
68
69impl LetterProcessor {
70    fn process_field(&self, val: &str) -> Result<Value> {
71        let processed = match self.method {
72            Method::Upper => val.to_uppercase(),
73            Method::Lower => val.to_lowercase(),
74            Method::Capital => capitalize(val),
75        };
76        let val = Value::String(processed);
77
78        Ok(val)
79    }
80}
81
82impl TryFrom<&yaml_rust::yaml::Hash> for LetterProcessor {
83    type Error = Error;
84
85    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
86        let mut fields = Fields::default();
87        let mut method = Method::Lower;
88        let mut ignore_missing = false;
89
90        for (k, v) in value.iter() {
91            let key = k
92                .as_str()
93                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
94            match key {
95                FIELD_NAME => {
96                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
97                }
98                FIELDS_NAME => {
99                    fields = yaml_new_fields(v, FIELDS_NAME)?;
100                }
101                METHOD_NAME => {
102                    method = yaml_string(v, METHOD_NAME)?.parse()?;
103                }
104                IGNORE_MISSING_NAME => {
105                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
106                }
107                _ => {}
108            }
109        }
110
111        Ok(LetterProcessor {
112            fields,
113            method,
114            ignore_missing,
115        })
116    }
117}
118
119impl Processor for LetterProcessor {
120    fn kind(&self) -> &str {
121        PROCESSOR_LETTER
122    }
123
124    fn ignore_missing(&self) -> bool {
125        self.ignore_missing
126    }
127
128    fn exec_mut(&self, mut val: Value) -> Result<Value> {
129        for field in self.fields.iter() {
130            let index = field.input_field();
131            match val.get(index) {
132                Some(Value::String(s)) => {
133                    let result = self.process_field(s)?;
134                    let output_key = field.target_or_input_field();
135                    val.insert(output_key.to_string(), result)?;
136                }
137                Some(Value::Null) | None => {
138                    if !self.ignore_missing {
139                        return ProcessorMissingFieldSnafu {
140                            processor: self.kind(),
141                            field: field.input_field(),
142                        }
143                        .fail();
144                    }
145                }
146                Some(v) => {
147                    return ProcessorExpectStringSnafu {
148                        processor: self.kind(),
149                        v: v.clone(),
150                    }
151                    .fail();
152                }
153            }
154        }
155
156        Ok(val)
157    }
158}
159
160fn capitalize(s: &str) -> String {
161    let mut c = s.chars();
162    match c.next() {
163        None => String::new(),
164        Some(f) => f.to_uppercase().collect::<String>() + c.as_str(),
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use crate::etl::processor::letter::{LetterProcessor, Method};
171    use crate::etl::value::Value;
172
173    #[test]
174    fn test_process() {
175        {
176            let processor = LetterProcessor {
177                method: Method::Upper,
178                ..Default::default()
179            };
180            let processed = processor.process_field("pipeline").unwrap();
181            assert_eq!(Value::String("PIPELINE".into()), processed)
182        }
183
184        {
185            let processor = LetterProcessor {
186                method: Method::Lower,
187                ..Default::default()
188            };
189            let processed = processor.process_field("Pipeline").unwrap();
190            assert_eq!(Value::String("pipeline".into()), processed)
191        }
192
193        {
194            let processor = LetterProcessor {
195                method: Method::Capital,
196                ..Default::default()
197            };
198            let processed = processor.process_field("pipeline").unwrap();
199            assert_eq!(Value::String("Pipeline".into()), processed)
200        }
201    }
202}