1use std::collections::BTreeMap;
18
19use csv::{ReaderBuilder, Trim};
20use itertools::EitherOrBoth::{Both, Left, Right};
21use itertools::Itertools;
22use snafu::{OptionExt, ResultExt};
23use vrl::prelude::Bytes;
24use vrl::value::{KeyString, Value as VrlValue};
25
26use crate::error::{
27 CsvNoRecordSnafu, CsvQuoteNameSnafu, CsvReadSnafu, CsvSeparatorNameSnafu, Error,
28 KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
29 ValueMustBeMapSnafu,
30};
31use crate::etl::field::Fields;
32use crate::etl::processor::{
33 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
34 IGNORE_MISSING_NAME,
35};
36
37pub(crate) const PROCESSOR_CSV: &str = "csv";
38
39const SEPARATOR_NAME: &str = "separator";
40const QUOTE_NAME: &str = "quote";
41const TRIM_NAME: &str = "trim";
42const EMPTY_VALUE_NAME: &str = "empty_value";
43const TARGET_FIELDS: &str = "target_fields";
44
45#[derive(Debug, Default)]
47pub struct CsvProcessor {
48 reader: ReaderBuilder,
49 fields: Fields,
50
51 ignore_missing: bool,
52
53 empty_value: Option<String>,
55 target_fields: Vec<String>,
56 }
62
63impl CsvProcessor {
64 fn process(&self, val: &[u8]) -> Result<BTreeMap<KeyString, VrlValue>> {
66 let mut reader = self.reader.from_reader(val);
67
68 if let Some(result) = reader.records().next() {
69 let record: csv::StringRecord = result.context(CsvReadSnafu)?;
70
71 let values = self
72 .target_fields
73 .iter()
74 .zip_longest(record.iter())
75 .filter_map(|zipped| match zipped {
76 Both(target_field, val) => Some((
77 KeyString::from(target_field.clone()),
78 VrlValue::Bytes(Bytes::from(val.to_string())),
79 )),
80 Left(target_field) => {
82 let value = self
83 .empty_value
84 .as_ref()
85 .map(|s| VrlValue::Bytes(Bytes::from(s.clone())))
86 .unwrap_or(VrlValue::Null);
87 Some((KeyString::from(target_field.clone()), value))
88 }
89 Right(_) => None,
91 })
92 .collect();
93
94 Ok(values)
95 } else {
96 CsvNoRecordSnafu.fail()
97 }
98 }
99}
100
101impl TryFrom<&yaml_rust::yaml::Hash> for CsvProcessor {
102 type Error = Error;
103
104 fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
105 let mut reader = ReaderBuilder::new();
106 reader.has_headers(false);
107
108 let mut fields = Fields::default();
109 let mut ignore_missing = false;
110 let mut empty_value = None;
111 let mut target_fields = vec![];
112
113 for (k, v) in hash {
114 let key = k
115 .as_str()
116 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
117 match key {
118 FIELD_NAME => {
119 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
120 }
121 FIELDS_NAME => {
122 fields = yaml_new_fields(v, FIELDS_NAME)?;
123 }
124 TARGET_FIELDS => {
125 target_fields = yaml_string(v, TARGET_FIELDS)?
126 .split(',')
127 .map(|s| s.trim().to_string())
128 .filter(|s| !s.is_empty())
129 .collect();
130 }
131 SEPARATOR_NAME => {
132 let separator = yaml_string(v, SEPARATOR_NAME)?;
133 if separator.len() != 1 {
134 return CsvSeparatorNameSnafu {
135 separator: SEPARATOR_NAME,
136 value: separator,
137 }
138 .fail();
139 } else {
140 reader.delimiter(separator.as_bytes()[0]);
141 }
142 }
143 QUOTE_NAME => {
144 let quote = yaml_string(v, QUOTE_NAME)?;
145 if quote.len() != 1 {
146 return CsvQuoteNameSnafu {
147 quote: QUOTE_NAME,
148 value: quote,
149 }
150 .fail();
151 } else {
152 reader.quote(quote.as_bytes()[0]);
153 }
154 }
155 TRIM_NAME => {
156 let trim = yaml_bool(v, TRIM_NAME)?;
157 if trim {
158 reader.trim(Trim::All);
159 } else {
160 reader.trim(Trim::None);
161 }
162 }
163 IGNORE_MISSING_NAME => {
164 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
165 }
166 EMPTY_VALUE_NAME => {
167 empty_value = Some(yaml_string(v, EMPTY_VALUE_NAME)?);
168 }
169
170 _ => {}
171 }
172 }
173 let proc = {
174 CsvProcessor {
175 reader,
176 fields,
177 ignore_missing,
178 empty_value,
179 target_fields,
180 }
181 };
182
183 Ok(proc)
184 }
185}
186
187impl Processor for CsvProcessor {
188 fn kind(&self) -> &str {
189 PROCESSOR_CSV
190 }
191
192 fn ignore_missing(&self) -> bool {
193 self.ignore_missing
194 }
195
196 fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
197 for field in self.fields.iter() {
198 let name = field.input_field();
199
200 let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
201
202 match val.get(name) {
203 Some(VrlValue::Bytes(v)) => {
204 let results = self.process(v)?;
205 val.extend(results);
206 }
207 Some(VrlValue::Null) | None => {
208 if !self.ignore_missing {
209 return ProcessorMissingFieldSnafu {
210 processor: self.kind().to_string(),
211 field: name.to_string(),
212 }
213 .fail();
214 }
215 }
216 Some(v) => {
217 return ProcessorExpectStringSnafu {
218 processor: self.kind().to_string(),
219 v: v.clone(),
220 }
221 .fail();
222 }
223 }
224 }
225 Ok(val)
226 }
227}
228
229#[cfg(test)]
230mod tests {
231
232 use super::*;
233 use crate::etl::field::Field;
234
235 #[test]
236 fn test_equal_length() {
237 let mut reader = csv::ReaderBuilder::new();
238 reader.has_headers(false);
239 let processor = CsvProcessor {
240 reader,
241 fields: Fields::new(vec![Field::new("data", None)]),
242 target_fields: vec!["a".into(), "b".into()],
243 ..Default::default()
244 };
245
246 let result = processor.process(b"1,2").unwrap();
247
248 let values: BTreeMap<KeyString, VrlValue> = [
249 (KeyString::from("a"), VrlValue::Bytes(Bytes::from("1"))),
250 (KeyString::from("b"), VrlValue::Bytes(Bytes::from("2"))),
251 ]
252 .into_iter()
253 .collect();
254
255 assert_eq!(result, values);
256 }
257
258 #[test]
260 fn test_target_fields_has_more_length() {
261 {
263 let mut reader = csv::ReaderBuilder::new();
264 reader.has_headers(false);
265 let processor = CsvProcessor {
266 reader,
267 fields: Fields::new(vec![Field::new("data", None)]),
268 target_fields: vec!["a".into(), "b".into(), "c".into()],
269 ..Default::default()
270 };
271
272 let result = processor.process(b"1,2").unwrap();
273
274 let values: BTreeMap<KeyString, VrlValue> = [
275 (KeyString::from("a"), VrlValue::Bytes(Bytes::from("1"))),
276 (KeyString::from("b"), VrlValue::Bytes(Bytes::from("2"))),
277 (KeyString::from("c"), VrlValue::Null),
278 ]
279 .into_iter()
280 .collect();
281
282 assert_eq!(result, values);
283 }
284
285 {
287 let mut reader = csv::ReaderBuilder::new();
288 reader.has_headers(false);
289 let processor = CsvProcessor {
290 reader,
291 fields: Fields::new(vec![Field::new("data", None)]),
292 target_fields: vec!["a".into(), "b".into(), "c".into()],
293 empty_value: Some("default".into()),
294 ..Default::default()
295 };
296
297 let result = processor.process(b"1,2").unwrap();
298
299 let values: BTreeMap<KeyString, VrlValue> = [
300 (KeyString::from("a"), VrlValue::Bytes(Bytes::from("1"))),
301 (KeyString::from("b"), VrlValue::Bytes(Bytes::from("2"))),
302 (
303 KeyString::from("c"),
304 VrlValue::Bytes(Bytes::from("default")),
305 ),
306 ]
307 .into_iter()
308 .collect();
309
310 assert_eq!(result, values);
311 }
312 }
313
314 #[test]
316 fn test_target_fields_has_less_length() {
317 let mut reader = csv::ReaderBuilder::new();
318 reader.has_headers(false);
319 let processor = CsvProcessor {
320 reader,
321 target_fields: vec!["a".into(), "b".into()],
322 empty_value: Some("default".into()),
323 ..Default::default()
324 };
325
326 let result = processor.process(b"1,2").unwrap();
327
328 let values: BTreeMap<KeyString, VrlValue> = [
329 (KeyString::from("a"), VrlValue::Bytes(Bytes::from("1"))),
330 (KeyString::from("b"), VrlValue::Bytes(Bytes::from("2"))),
331 ]
332 .into_iter()
333 .collect();
334
335 assert_eq!(result, values);
336 }
337}