pipeline/etl/processor/
csv.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/csv-processor.html
16
17use std::collections::BTreeMap;
18
19use csv::{ReaderBuilder, Trim};
20use itertools::EitherOrBoth::{Both, Left, Right};
21use itertools::Itertools;
22use snafu::{OptionExt, ResultExt};
23use vrl::prelude::Bytes;
24use vrl::value::{KeyString, Value as VrlValue};
25
26use crate::error::{
27    CsvNoRecordSnafu, CsvQuoteNameSnafu, CsvReadSnafu, CsvSeparatorNameSnafu, Error,
28    KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
29    ValueMustBeMapSnafu,
30};
31use crate::etl::field::Fields;
32use crate::etl::processor::{
33    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
34    IGNORE_MISSING_NAME,
35};
36
37pub(crate) const PROCESSOR_CSV: &str = "csv";
38
39const SEPARATOR_NAME: &str = "separator";
40const QUOTE_NAME: &str = "quote";
41const TRIM_NAME: &str = "trim";
42const EMPTY_VALUE_NAME: &str = "empty_value";
43const TARGET_FIELDS: &str = "target_fields";
44
45/// only support string value
46#[derive(Debug, Default)]
47pub struct CsvProcessor {
48    reader: ReaderBuilder,
49    fields: Fields,
50
51    ignore_missing: bool,
52
53    // Value used to fill empty fields, empty fields will be skipped if this is not provided.
54    empty_value: Option<String>,
55    target_fields: Vec<String>,
56    // description
57    // if
58    // ignore_failure
59    // on_failure
60    // tag
61}
62
63impl CsvProcessor {
64    // process the csv format string to a map with target_fields as keys
65    fn process(&self, val: &[u8]) -> Result<BTreeMap<KeyString, VrlValue>> {
66        let mut reader = self.reader.from_reader(val);
67
68        if let Some(result) = reader.records().next() {
69            let record: csv::StringRecord = result.context(CsvReadSnafu)?;
70
71            let values = self
72                .target_fields
73                .iter()
74                .zip_longest(record.iter())
75                .filter_map(|zipped| match zipped {
76                    Both(target_field, val) => Some((
77                        KeyString::from(target_field.clone()),
78                        VrlValue::Bytes(Bytes::from(val.to_string())),
79                    )),
80                    // if target fields are more than extracted fields, fill the rest with empty value
81                    Left(target_field) => {
82                        let value = self
83                            .empty_value
84                            .as_ref()
85                            .map(|s| VrlValue::Bytes(Bytes::from(s.clone())))
86                            .unwrap_or(VrlValue::Null);
87                        Some((KeyString::from(target_field.clone()), value))
88                    }
89                    // if extracted fields are more than target fields, ignore the rest
90                    Right(_) => None,
91                })
92                .collect();
93
94            Ok(values)
95        } else {
96            CsvNoRecordSnafu.fail()
97        }
98    }
99}
100
101impl TryFrom<&yaml_rust::yaml::Hash> for CsvProcessor {
102    type Error = Error;
103
104    fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
105        let mut reader = ReaderBuilder::new();
106        reader.has_headers(false);
107
108        let mut fields = Fields::default();
109        let mut ignore_missing = false;
110        let mut empty_value = None;
111        let mut target_fields = vec![];
112
113        for (k, v) in hash {
114            let key = k
115                .as_str()
116                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
117            match key {
118                FIELD_NAME => {
119                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
120                }
121                FIELDS_NAME => {
122                    fields = yaml_new_fields(v, FIELDS_NAME)?;
123                }
124                TARGET_FIELDS => {
125                    target_fields = yaml_string(v, TARGET_FIELDS)?
126                        .split(',')
127                        .map(|s| s.trim().to_string())
128                        .filter(|s| !s.is_empty())
129                        .collect();
130                }
131                SEPARATOR_NAME => {
132                    let separator = yaml_string(v, SEPARATOR_NAME)?;
133                    if separator.len() != 1 {
134                        return CsvSeparatorNameSnafu {
135                            separator: SEPARATOR_NAME,
136                            value: separator,
137                        }
138                        .fail();
139                    } else {
140                        reader.delimiter(separator.as_bytes()[0]);
141                    }
142                }
143                QUOTE_NAME => {
144                    let quote = yaml_string(v, QUOTE_NAME)?;
145                    if quote.len() != 1 {
146                        return CsvQuoteNameSnafu {
147                            quote: QUOTE_NAME,
148                            value: quote,
149                        }
150                        .fail();
151                    } else {
152                        reader.quote(quote.as_bytes()[0]);
153                    }
154                }
155                TRIM_NAME => {
156                    let trim = yaml_bool(v, TRIM_NAME)?;
157                    if trim {
158                        reader.trim(Trim::All);
159                    } else {
160                        reader.trim(Trim::None);
161                    }
162                }
163                IGNORE_MISSING_NAME => {
164                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
165                }
166                EMPTY_VALUE_NAME => {
167                    empty_value = Some(yaml_string(v, EMPTY_VALUE_NAME)?);
168                }
169
170                _ => {}
171            }
172        }
173        let proc = {
174            CsvProcessor {
175                reader,
176                fields,
177                ignore_missing,
178                empty_value,
179                target_fields,
180            }
181        };
182
183        Ok(proc)
184    }
185}
186
187impl Processor for CsvProcessor {
188    fn kind(&self) -> &str {
189        PROCESSOR_CSV
190    }
191
192    fn ignore_missing(&self) -> bool {
193        self.ignore_missing
194    }
195
196    fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
197        for field in self.fields.iter() {
198            let name = field.input_field();
199
200            let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
201
202            match val.get(name) {
203                Some(VrlValue::Bytes(v)) => {
204                    let results = self.process(v)?;
205                    val.extend(results);
206                }
207                Some(VrlValue::Null) | None => {
208                    if !self.ignore_missing {
209                        return ProcessorMissingFieldSnafu {
210                            processor: self.kind().to_string(),
211                            field: name.to_string(),
212                        }
213                        .fail();
214                    }
215                }
216                Some(v) => {
217                    return ProcessorExpectStringSnafu {
218                        processor: self.kind().to_string(),
219                        v: v.clone(),
220                    }
221                    .fail();
222                }
223            }
224        }
225        Ok(val)
226    }
227}
228
229#[cfg(test)]
230mod tests {
231
232    use super::*;
233    use crate::etl::field::Field;
234
235    #[test]
236    fn test_equal_length() {
237        let mut reader = csv::ReaderBuilder::new();
238        reader.has_headers(false);
239        let processor = CsvProcessor {
240            reader,
241            fields: Fields::new(vec![Field::new("data", None)]),
242            target_fields: vec!["a".into(), "b".into()],
243            ..Default::default()
244        };
245
246        let result = processor.process(b"1,2").unwrap();
247
248        let values: BTreeMap<KeyString, VrlValue> = [
249            (KeyString::from("a"), VrlValue::Bytes(Bytes::from("1"))),
250            (KeyString::from("b"), VrlValue::Bytes(Bytes::from("2"))),
251        ]
252        .into_iter()
253        .collect();
254
255        assert_eq!(result, values);
256    }
257
258    // test target_fields length larger than the record length
259    #[test]
260    fn test_target_fields_has_more_length() {
261        // with no empty value
262        {
263            let mut reader = csv::ReaderBuilder::new();
264            reader.has_headers(false);
265            let processor = CsvProcessor {
266                reader,
267                fields: Fields::new(vec![Field::new("data", None)]),
268                target_fields: vec!["a".into(), "b".into(), "c".into()],
269                ..Default::default()
270            };
271
272            let result = processor.process(b"1,2").unwrap();
273
274            let values: BTreeMap<KeyString, VrlValue> = [
275                (KeyString::from("a"), VrlValue::Bytes(Bytes::from("1"))),
276                (KeyString::from("b"), VrlValue::Bytes(Bytes::from("2"))),
277                (KeyString::from("c"), VrlValue::Null),
278            ]
279            .into_iter()
280            .collect();
281
282            assert_eq!(result, values);
283        }
284
285        // with empty value
286        {
287            let mut reader = csv::ReaderBuilder::new();
288            reader.has_headers(false);
289            let processor = CsvProcessor {
290                reader,
291                fields: Fields::new(vec![Field::new("data", None)]),
292                target_fields: vec!["a".into(), "b".into(), "c".into()],
293                empty_value: Some("default".into()),
294                ..Default::default()
295            };
296
297            let result = processor.process(b"1,2").unwrap();
298
299            let values: BTreeMap<KeyString, VrlValue> = [
300                (KeyString::from("a"), VrlValue::Bytes(Bytes::from("1"))),
301                (KeyString::from("b"), VrlValue::Bytes(Bytes::from("2"))),
302                (
303                    KeyString::from("c"),
304                    VrlValue::Bytes(Bytes::from("default")),
305                ),
306            ]
307            .into_iter()
308            .collect();
309
310            assert_eq!(result, values);
311        }
312    }
313
314    // test record has larger length
315    #[test]
316    fn test_target_fields_has_less_length() {
317        let mut reader = csv::ReaderBuilder::new();
318        reader.has_headers(false);
319        let processor = CsvProcessor {
320            reader,
321            target_fields: vec!["a".into(), "b".into()],
322            empty_value: Some("default".into()),
323            ..Default::default()
324        };
325
326        let result = processor.process(b"1,2").unwrap();
327
328        let values: BTreeMap<KeyString, VrlValue> = [
329            (KeyString::from("a"), VrlValue::Bytes(Bytes::from("1"))),
330            (KeyString::from("b"), VrlValue::Bytes(Bytes::from("2"))),
331        ]
332        .into_iter()
333        .collect();
334
335        assert_eq!(result, values);
336    }
337}