pipeline/etl/processor/
letter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use snafu::OptionExt;
16
17use crate::error::{
18    Error, KeyMustBeStringSnafu, LetterInvalidMethodSnafu, ProcessorExpectStringSnafu,
19    ProcessorMissingFieldSnafu, Result,
20};
21use crate::etl::field::Fields;
22use crate::etl::processor::{
23    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
24    IGNORE_MISSING_NAME, METHOD_NAME,
25};
26use crate::etl::value::Value;
27use crate::etl::PipelineMap;
28
29pub(crate) const PROCESSOR_LETTER: &str = "letter";
30
31#[derive(Debug, Default)]
32enum Method {
33    Upper,
34    #[default]
35    Lower,
36    Capital,
37}
38
39impl std::fmt::Display for Method {
40    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
41        match self {
42            Method::Upper => write!(f, "upper"),
43            Method::Lower => write!(f, "lower"),
44            Method::Capital => write!(f, "capital"),
45        }
46    }
47}
48
49impl std::str::FromStr for Method {
50    type Err = Error;
51
52    fn from_str(s: &str) -> Result<Self> {
53        match s.to_lowercase().as_str() {
54            "upper" => Ok(Method::Upper),
55            "lower" => Ok(Method::Lower),
56            "capital" => Ok(Method::Capital),
57            _ => LetterInvalidMethodSnafu { method: s }.fail(),
58        }
59    }
60}
61
62/// only support string value
63#[derive(Debug, Default)]
64pub struct LetterProcessor {
65    fields: Fields,
66    method: Method,
67    ignore_missing: bool,
68}
69
70impl LetterProcessor {
71    fn process_field(&self, val: &str) -> Result<Value> {
72        let processed = match self.method {
73            Method::Upper => val.to_uppercase(),
74            Method::Lower => val.to_lowercase(),
75            Method::Capital => capitalize(val),
76        };
77        let val = Value::String(processed);
78
79        Ok(val)
80    }
81}
82
83impl TryFrom<&yaml_rust::yaml::Hash> for LetterProcessor {
84    type Error = Error;
85
86    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
87        let mut fields = Fields::default();
88        let mut method = Method::Lower;
89        let mut ignore_missing = false;
90
91        for (k, v) in value.iter() {
92            let key = k
93                .as_str()
94                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
95            match key {
96                FIELD_NAME => {
97                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
98                }
99                FIELDS_NAME => {
100                    fields = yaml_new_fields(v, FIELDS_NAME)?;
101                }
102                METHOD_NAME => {
103                    method = yaml_string(v, METHOD_NAME)?.parse()?;
104                }
105                IGNORE_MISSING_NAME => {
106                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
107                }
108                _ => {}
109            }
110        }
111
112        Ok(LetterProcessor {
113            fields,
114            method,
115            ignore_missing,
116        })
117    }
118}
119
120impl Processor for LetterProcessor {
121    fn kind(&self) -> &str {
122        PROCESSOR_LETTER
123    }
124
125    fn ignore_missing(&self) -> bool {
126        self.ignore_missing
127    }
128
129    fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
130        for field in self.fields.iter() {
131            let index = field.input_field();
132            match val.get(index) {
133                Some(Value::String(s)) => {
134                    let result = self.process_field(s)?;
135                    let output_key = field.target_or_input_field();
136                    val.insert(output_key.to_string(), result);
137                }
138                Some(Value::Null) | None => {
139                    if !self.ignore_missing {
140                        return ProcessorMissingFieldSnafu {
141                            processor: self.kind(),
142                            field: field.input_field(),
143                        }
144                        .fail();
145                    }
146                }
147                Some(v) => {
148                    return ProcessorExpectStringSnafu {
149                        processor: self.kind(),
150                        v: v.clone(),
151                    }
152                    .fail();
153                }
154            }
155        }
156
157        Ok(())
158    }
159}
160
161fn capitalize(s: &str) -> String {
162    let mut c = s.chars();
163    match c.next() {
164        None => String::new(),
165        Some(f) => f.to_uppercase().collect::<String>() + c.as_str(),
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use crate::etl::processor::letter::{LetterProcessor, Method};
172    use crate::etl::value::Value;
173
174    #[test]
175    fn test_process() {
176        {
177            let processor = LetterProcessor {
178                method: Method::Upper,
179                ..Default::default()
180            };
181            let processed = processor.process_field("pipeline").unwrap();
182            assert_eq!(Value::String("PIPELINE".into()), processed)
183        }
184
185        {
186            let processor = LetterProcessor {
187                method: Method::Lower,
188                ..Default::default()
189            };
190            let processed = processor.process_field("Pipeline").unwrap();
191            assert_eq!(Value::String("pipeline".into()), processed)
192        }
193
194        {
195            let processor = LetterProcessor {
196                method: Method::Capital,
197                ..Default::default()
198            };
199            let processed = processor.process_field("pipeline").unwrap();
200            assert_eq!(Value::String("Pipeline".into()), processed)
201        }
202    }
203}