pipeline/etl/processor/
csv.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/csv-processor.html
16
17use std::collections::BTreeMap;
18
19use csv::{ReaderBuilder, Trim};
20use itertools::EitherOrBoth::{Both, Left, Right};
21use itertools::Itertools;
22use snafu::{OptionExt, ResultExt};
23
24use crate::error::{
25    CsvNoRecordSnafu, CsvQuoteNameSnafu, CsvReadSnafu, CsvSeparatorNameSnafu, Error,
26    KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
27};
28use crate::etl::field::Fields;
29use crate::etl::processor::{
30    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
31    IGNORE_MISSING_NAME,
32};
33use crate::etl::value::Value;
34
35pub(crate) const PROCESSOR_CSV: &str = "csv";
36
37const SEPARATOR_NAME: &str = "separator";
38const QUOTE_NAME: &str = "quote";
39const TRIM_NAME: &str = "trim";
40const EMPTY_VALUE_NAME: &str = "empty_value";
41const TARGET_FIELDS: &str = "target_fields";
42
43/// only support string value
44#[derive(Debug, Default)]
45pub struct CsvProcessor {
46    reader: ReaderBuilder,
47    fields: Fields,
48
49    ignore_missing: bool,
50
51    // Value used to fill empty fields, empty fields will be skipped if this is not provided.
52    empty_value: Option<String>,
53    target_fields: Vec<String>,
54    // description
55    // if
56    // ignore_failure
57    // on_failure
58    // tag
59}
60
61impl CsvProcessor {
62    // process the csv format string to a map with target_fields as keys
63    fn process(&self, val: &str) -> Result<BTreeMap<String, Value>> {
64        let mut reader = self.reader.from_reader(val.as_bytes());
65
66        if let Some(result) = reader.records().next() {
67            let record: csv::StringRecord = result.context(CsvReadSnafu)?;
68
69            let values = self
70                .target_fields
71                .iter()
72                .zip_longest(record.iter())
73                .filter_map(|zipped| match zipped {
74                    Both(target_field, val) => {
75                        Some((target_field.clone(), Value::String(val.into())))
76                    }
77                    // if target fields are more than extracted fields, fill the rest with empty value
78                    Left(target_field) => {
79                        let value = self
80                            .empty_value
81                            .as_ref()
82                            .map(|s| Value::String(s.clone()))
83                            .unwrap_or(Value::Null);
84                        Some((target_field.clone(), value))
85                    }
86                    // if extracted fields are more than target fields, ignore the rest
87                    Right(_) => None,
88                })
89                .collect();
90
91            Ok(values)
92        } else {
93            CsvNoRecordSnafu.fail()
94        }
95    }
96}
97
98impl TryFrom<&yaml_rust::yaml::Hash> for CsvProcessor {
99    type Error = Error;
100
101    fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
102        let mut reader = ReaderBuilder::new();
103        reader.has_headers(false);
104
105        let mut fields = Fields::default();
106        let mut ignore_missing = false;
107        let mut empty_value = None;
108        let mut target_fields = vec![];
109
110        for (k, v) in hash {
111            let key = k
112                .as_str()
113                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
114            match key {
115                FIELD_NAME => {
116                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
117                }
118                FIELDS_NAME => {
119                    fields = yaml_new_fields(v, FIELDS_NAME)?;
120                }
121                TARGET_FIELDS => {
122                    target_fields = yaml_string(v, TARGET_FIELDS)?
123                        .split(',')
124                        .map(|s| s.trim().to_string())
125                        .filter(|s| !s.is_empty())
126                        .collect();
127                }
128                SEPARATOR_NAME => {
129                    let separator = yaml_string(v, SEPARATOR_NAME)?;
130                    if separator.len() != 1 {
131                        return CsvSeparatorNameSnafu {
132                            separator: SEPARATOR_NAME,
133                            value: separator,
134                        }
135                        .fail();
136                    } else {
137                        reader.delimiter(separator.as_bytes()[0]);
138                    }
139                }
140                QUOTE_NAME => {
141                    let quote = yaml_string(v, QUOTE_NAME)?;
142                    if quote.len() != 1 {
143                        return CsvQuoteNameSnafu {
144                            quote: QUOTE_NAME,
145                            value: quote,
146                        }
147                        .fail();
148                    } else {
149                        reader.quote(quote.as_bytes()[0]);
150                    }
151                }
152                TRIM_NAME => {
153                    let trim = yaml_bool(v, TRIM_NAME)?;
154                    if trim {
155                        reader.trim(Trim::All);
156                    } else {
157                        reader.trim(Trim::None);
158                    }
159                }
160                IGNORE_MISSING_NAME => {
161                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
162                }
163                EMPTY_VALUE_NAME => {
164                    empty_value = Some(yaml_string(v, EMPTY_VALUE_NAME)?);
165                }
166
167                _ => {}
168            }
169        }
170        let proc = {
171            CsvProcessor {
172                reader,
173                fields,
174                ignore_missing,
175                empty_value,
176                target_fields,
177            }
178        };
179
180        Ok(proc)
181    }
182}
183
184impl Processor for CsvProcessor {
185    fn kind(&self) -> &str {
186        PROCESSOR_CSV
187    }
188
189    fn ignore_missing(&self) -> bool {
190        self.ignore_missing
191    }
192
193    fn exec_mut(&self, mut val: Value) -> Result<Value> {
194        for field in self.fields.iter() {
195            let name = field.input_field();
196
197            match val.get(name) {
198                Some(Value::String(v)) => {
199                    let results = self.process(v)?;
200                    val.extend(results.into())?;
201                }
202                Some(Value::Null) | None => {
203                    if !self.ignore_missing {
204                        return ProcessorMissingFieldSnafu {
205                            processor: self.kind().to_string(),
206                            field: name.to_string(),
207                        }
208                        .fail();
209                    }
210                }
211                Some(v) => {
212                    return ProcessorExpectStringSnafu {
213                        processor: self.kind().to_string(),
214                        v: v.clone(),
215                    }
216                    .fail();
217                }
218            }
219        }
220        Ok(val)
221    }
222}
223
224#[cfg(test)]
225mod tests {
226
227    use super::*;
228    use crate::etl::field::Field;
229
230    #[test]
231    fn test_equal_length() {
232        let mut reader = csv::ReaderBuilder::new();
233        reader.has_headers(false);
234        let processor = CsvProcessor {
235            reader,
236            fields: Fields::new(vec![Field::new("data", None)]),
237            target_fields: vec!["a".into(), "b".into()],
238            ..Default::default()
239        };
240
241        let result = processor.process("1,2").unwrap();
242
243        let values: BTreeMap<String, Value> = [
244            ("a".into(), Value::String("1".into())),
245            ("b".into(), Value::String("2".into())),
246        ]
247        .into_iter()
248        .collect();
249
250        assert_eq!(result, values);
251    }
252
253    // test target_fields length larger than the record length
254    #[test]
255    fn test_target_fields_has_more_length() {
256        // with no empty value
257        {
258            let mut reader = csv::ReaderBuilder::new();
259            reader.has_headers(false);
260            let processor = CsvProcessor {
261                reader,
262                fields: Fields::new(vec![Field::new("data", None)]),
263                target_fields: vec!["a".into(), "b".into(), "c".into()],
264                ..Default::default()
265            };
266
267            let result = processor.process("1,2").unwrap();
268
269            let values: BTreeMap<String, Value> = [
270                ("a".into(), Value::String("1".into())),
271                ("b".into(), Value::String("2".into())),
272                ("c".into(), Value::Null),
273            ]
274            .into_iter()
275            .collect();
276
277            assert_eq!(result, values);
278        }
279
280        // with empty value
281        {
282            let mut reader = csv::ReaderBuilder::new();
283            reader.has_headers(false);
284            let processor = CsvProcessor {
285                reader,
286                fields: Fields::new(vec![Field::new("data", None)]),
287                target_fields: vec!["a".into(), "b".into(), "c".into()],
288                empty_value: Some("default".into()),
289                ..Default::default()
290            };
291
292            let result = processor.process("1,2").unwrap();
293
294            let values: BTreeMap<String, Value> = [
295                ("a".into(), Value::String("1".into())),
296                ("b".into(), Value::String("2".into())),
297                ("c".into(), Value::String("default".into())),
298            ]
299            .into_iter()
300            .collect();
301
302            assert_eq!(result, values);
303        }
304    }
305
306    // test record has larger length
307    #[test]
308    fn test_target_fields_has_less_length() {
309        let mut reader = csv::ReaderBuilder::new();
310        reader.has_headers(false);
311        let processor = CsvProcessor {
312            reader,
313            target_fields: vec!["a".into(), "b".into()],
314            empty_value: Some("default".into()),
315            ..Default::default()
316        };
317
318        let result = processor.process("1,2").unwrap();
319
320        let values: BTreeMap<String, Value> = [
321            ("a".into(), Value::String("1".into())),
322            ("b".into(), Value::String("2".into())),
323        ]
324        .into_iter()
325        .collect();
326
327        assert_eq!(result, values);
328    }
329}