pipeline/etl/processor/
csv.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/csv-processor.html
16
17use csv::{ReaderBuilder, Trim};
18use itertools::EitherOrBoth::{Both, Left, Right};
19use itertools::Itertools;
20use snafu::{OptionExt, ResultExt};
21
22use crate::error::{
23    CsvNoRecordSnafu, CsvQuoteNameSnafu, CsvReadSnafu, CsvSeparatorNameSnafu, Error,
24    KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
25};
26use crate::etl::field::Fields;
27use crate::etl::processor::{
28    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
29    IGNORE_MISSING_NAME,
30};
31use crate::etl::value::Value;
32use crate::etl::PipelineMap;
33
34pub(crate) const PROCESSOR_CSV: &str = "csv";
35
36const SEPARATOR_NAME: &str = "separator";
37const QUOTE_NAME: &str = "quote";
38const TRIM_NAME: &str = "trim";
39const EMPTY_VALUE_NAME: &str = "empty_value";
40const TARGET_FIELDS: &str = "target_fields";
41
42/// only support string value
43#[derive(Debug, Default)]
44pub struct CsvProcessor {
45    reader: ReaderBuilder,
46    fields: Fields,
47
48    ignore_missing: bool,
49
50    // Value used to fill empty fields, empty fields will be skipped if this is not provided.
51    empty_value: Option<String>,
52    target_fields: Vec<String>,
53    // description
54    // if
55    // ignore_failure
56    // on_failure
57    // tag
58}
59
60impl CsvProcessor {
61    // process the csv format string to a map with target_fields as keys
62    fn process(&self, val: &str) -> Result<PipelineMap> {
63        let mut reader = self.reader.from_reader(val.as_bytes());
64
65        if let Some(result) = reader.records().next() {
66            let record: csv::StringRecord = result.context(CsvReadSnafu)?;
67
68            let values = self
69                .target_fields
70                .iter()
71                .zip_longest(record.iter())
72                .filter_map(|zipped| match zipped {
73                    Both(target_field, val) => {
74                        Some((target_field.clone(), Value::String(val.into())))
75                    }
76                    // if target fields are more than extracted fields, fill the rest with empty value
77                    Left(target_field) => {
78                        let value = self
79                            .empty_value
80                            .as_ref()
81                            .map(|s| Value::String(s.clone()))
82                            .unwrap_or(Value::Null);
83                        Some((target_field.clone(), value))
84                    }
85                    // if extracted fields are more than target fields, ignore the rest
86                    Right(_) => None,
87                })
88                .collect();
89
90            Ok(values)
91        } else {
92            CsvNoRecordSnafu.fail()
93        }
94    }
95}
96
97impl TryFrom<&yaml_rust::yaml::Hash> for CsvProcessor {
98    type Error = Error;
99
100    fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
101        let mut reader = ReaderBuilder::new();
102        reader.has_headers(false);
103
104        let mut fields = Fields::default();
105        let mut ignore_missing = false;
106        let mut empty_value = None;
107        let mut target_fields = vec![];
108
109        for (k, v) in hash {
110            let key = k
111                .as_str()
112                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
113            match key {
114                FIELD_NAME => {
115                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
116                }
117                FIELDS_NAME => {
118                    fields = yaml_new_fields(v, FIELDS_NAME)?;
119                }
120                TARGET_FIELDS => {
121                    target_fields = yaml_string(v, TARGET_FIELDS)?
122                        .split(',')
123                        .map(|s| s.trim().to_string())
124                        .filter(|s| !s.is_empty())
125                        .collect();
126                }
127                SEPARATOR_NAME => {
128                    let separator = yaml_string(v, SEPARATOR_NAME)?;
129                    if separator.len() != 1 {
130                        return CsvSeparatorNameSnafu {
131                            separator: SEPARATOR_NAME,
132                            value: separator,
133                        }
134                        .fail();
135                    } else {
136                        reader.delimiter(separator.as_bytes()[0]);
137                    }
138                }
139                QUOTE_NAME => {
140                    let quote = yaml_string(v, QUOTE_NAME)?;
141                    if quote.len() != 1 {
142                        return CsvQuoteNameSnafu {
143                            quote: QUOTE_NAME,
144                            value: quote,
145                        }
146                        .fail();
147                    } else {
148                        reader.quote(quote.as_bytes()[0]);
149                    }
150                }
151                TRIM_NAME => {
152                    let trim = yaml_bool(v, TRIM_NAME)?;
153                    if trim {
154                        reader.trim(Trim::All);
155                    } else {
156                        reader.trim(Trim::None);
157                    }
158                }
159                IGNORE_MISSING_NAME => {
160                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
161                }
162                EMPTY_VALUE_NAME => {
163                    empty_value = Some(yaml_string(v, EMPTY_VALUE_NAME)?);
164                }
165
166                _ => {}
167            }
168        }
169        let proc = {
170            CsvProcessor {
171                reader,
172                fields,
173                ignore_missing,
174                empty_value,
175                target_fields,
176            }
177        };
178
179        Ok(proc)
180    }
181}
182
183impl Processor for CsvProcessor {
184    fn kind(&self) -> &str {
185        PROCESSOR_CSV
186    }
187
188    fn ignore_missing(&self) -> bool {
189        self.ignore_missing
190    }
191
192    fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
193        for field in self.fields.iter() {
194            let name = field.input_field();
195
196            match val.get(name) {
197                Some(Value::String(v)) => {
198                    let results = self.process(v)?;
199                    val.extend(results);
200                }
201                Some(Value::Null) | None => {
202                    if !self.ignore_missing {
203                        return ProcessorMissingFieldSnafu {
204                            processor: self.kind().to_string(),
205                            field: name.to_string(),
206                        }
207                        .fail();
208                    }
209                }
210                Some(v) => {
211                    return ProcessorExpectStringSnafu {
212                        processor: self.kind().to_string(),
213                        v: v.clone(),
214                    }
215                    .fail();
216                }
217            }
218        }
219        Ok(())
220    }
221}
222
223#[cfg(test)]
224mod tests {
225
226    use super::*;
227    use crate::etl::field::Field;
228
229    #[test]
230    fn test_equal_length() {
231        let mut reader = csv::ReaderBuilder::new();
232        reader.has_headers(false);
233        let processor = CsvProcessor {
234            reader,
235            fields: Fields::new(vec![Field::new("data", None)]),
236            target_fields: vec!["a".into(), "b".into()],
237            ..Default::default()
238        };
239
240        let result = processor.process("1,2").unwrap();
241
242        let values: PipelineMap = [
243            ("a".into(), Value::String("1".into())),
244            ("b".into(), Value::String("2".into())),
245        ]
246        .into_iter()
247        .collect();
248
249        assert_eq!(result, values);
250    }
251
252    // test target_fields length larger than the record length
253    #[test]
254    fn test_target_fields_has_more_length() {
255        // with no empty value
256        {
257            let mut reader = csv::ReaderBuilder::new();
258            reader.has_headers(false);
259            let processor = CsvProcessor {
260                reader,
261                fields: Fields::new(vec![Field::new("data", None)]),
262                target_fields: vec!["a".into(), "b".into(), "c".into()],
263                ..Default::default()
264            };
265
266            let result = processor.process("1,2").unwrap();
267
268            let values: PipelineMap = [
269                ("a".into(), Value::String("1".into())),
270                ("b".into(), Value::String("2".into())),
271                ("c".into(), Value::Null),
272            ]
273            .into_iter()
274            .collect();
275
276            assert_eq!(result, values);
277        }
278
279        // with empty value
280        {
281            let mut reader = csv::ReaderBuilder::new();
282            reader.has_headers(false);
283            let processor = CsvProcessor {
284                reader,
285                fields: Fields::new(vec![Field::new("data", None)]),
286                target_fields: vec!["a".into(), "b".into(), "c".into()],
287                empty_value: Some("default".into()),
288                ..Default::default()
289            };
290
291            let result = processor.process("1,2").unwrap();
292
293            let values: PipelineMap = [
294                ("a".into(), Value::String("1".into())),
295                ("b".into(), Value::String("2".into())),
296                ("c".into(), Value::String("default".into())),
297            ]
298            .into_iter()
299            .collect();
300
301            assert_eq!(result, values);
302        }
303    }
304
305    // test record has larger length
306    #[test]
307    fn test_target_fields_has_less_length() {
308        let mut reader = csv::ReaderBuilder::new();
309        reader.has_headers(false);
310        let processor = CsvProcessor {
311            reader,
312            target_fields: vec!["a".into(), "b".into()],
313            empty_value: Some("default".into()),
314            ..Default::default()
315        };
316
317        let result = processor.process("1,2").unwrap();
318
319        let values: PipelineMap = [
320            ("a".into(), Value::String("1".into())),
321            ("b".into(), Value::String("2".into())),
322        ]
323        .into_iter()
324        .collect();
325
326        assert_eq!(result, values);
327    }
328}