1use csv::{ReaderBuilder, Trim};
18use itertools::EitherOrBoth::{Both, Left, Right};
19use itertools::Itertools;
20use snafu::{OptionExt, ResultExt};
21
22use crate::error::{
23 CsvNoRecordSnafu, CsvQuoteNameSnafu, CsvReadSnafu, CsvSeparatorNameSnafu, Error,
24 KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
25};
26use crate::etl::field::Fields;
27use crate::etl::processor::{
28 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
29 IGNORE_MISSING_NAME,
30};
31use crate::etl::value::Value;
32use crate::etl::PipelineMap;
33
34pub(crate) const PROCESSOR_CSV: &str = "csv";
35
36const SEPARATOR_NAME: &str = "separator";
37const QUOTE_NAME: &str = "quote";
38const TRIM_NAME: &str = "trim";
39const EMPTY_VALUE_NAME: &str = "empty_value";
40const TARGET_FIELDS: &str = "target_fields";
41
42#[derive(Debug, Default)]
44pub struct CsvProcessor {
45 reader: ReaderBuilder,
46 fields: Fields,
47
48 ignore_missing: bool,
49
50 empty_value: Option<String>,
52 target_fields: Vec<String>,
53 }
59
60impl CsvProcessor {
61 fn process(&self, val: &str) -> Result<PipelineMap> {
63 let mut reader = self.reader.from_reader(val.as_bytes());
64
65 if let Some(result) = reader.records().next() {
66 let record: csv::StringRecord = result.context(CsvReadSnafu)?;
67
68 let values = self
69 .target_fields
70 .iter()
71 .zip_longest(record.iter())
72 .filter_map(|zipped| match zipped {
73 Both(target_field, val) => {
74 Some((target_field.clone(), Value::String(val.into())))
75 }
76 Left(target_field) => {
78 let value = self
79 .empty_value
80 .as_ref()
81 .map(|s| Value::String(s.clone()))
82 .unwrap_or(Value::Null);
83 Some((target_field.clone(), value))
84 }
85 Right(_) => None,
87 })
88 .collect();
89
90 Ok(values)
91 } else {
92 CsvNoRecordSnafu.fail()
93 }
94 }
95}
96
97impl TryFrom<&yaml_rust::yaml::Hash> for CsvProcessor {
98 type Error = Error;
99
100 fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
101 let mut reader = ReaderBuilder::new();
102 reader.has_headers(false);
103
104 let mut fields = Fields::default();
105 let mut ignore_missing = false;
106 let mut empty_value = None;
107 let mut target_fields = vec![];
108
109 for (k, v) in hash {
110 let key = k
111 .as_str()
112 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
113 match key {
114 FIELD_NAME => {
115 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
116 }
117 FIELDS_NAME => {
118 fields = yaml_new_fields(v, FIELDS_NAME)?;
119 }
120 TARGET_FIELDS => {
121 target_fields = yaml_string(v, TARGET_FIELDS)?
122 .split(',')
123 .map(|s| s.trim().to_string())
124 .filter(|s| !s.is_empty())
125 .collect();
126 }
127 SEPARATOR_NAME => {
128 let separator = yaml_string(v, SEPARATOR_NAME)?;
129 if separator.len() != 1 {
130 return CsvSeparatorNameSnafu {
131 separator: SEPARATOR_NAME,
132 value: separator,
133 }
134 .fail();
135 } else {
136 reader.delimiter(separator.as_bytes()[0]);
137 }
138 }
139 QUOTE_NAME => {
140 let quote = yaml_string(v, QUOTE_NAME)?;
141 if quote.len() != 1 {
142 return CsvQuoteNameSnafu {
143 quote: QUOTE_NAME,
144 value: quote,
145 }
146 .fail();
147 } else {
148 reader.quote(quote.as_bytes()[0]);
149 }
150 }
151 TRIM_NAME => {
152 let trim = yaml_bool(v, TRIM_NAME)?;
153 if trim {
154 reader.trim(Trim::All);
155 } else {
156 reader.trim(Trim::None);
157 }
158 }
159 IGNORE_MISSING_NAME => {
160 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
161 }
162 EMPTY_VALUE_NAME => {
163 empty_value = Some(yaml_string(v, EMPTY_VALUE_NAME)?);
164 }
165
166 _ => {}
167 }
168 }
169 let proc = {
170 CsvProcessor {
171 reader,
172 fields,
173 ignore_missing,
174 empty_value,
175 target_fields,
176 }
177 };
178
179 Ok(proc)
180 }
181}
182
183impl Processor for CsvProcessor {
184 fn kind(&self) -> &str {
185 PROCESSOR_CSV
186 }
187
188 fn ignore_missing(&self) -> bool {
189 self.ignore_missing
190 }
191
192 fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
193 for field in self.fields.iter() {
194 let name = field.input_field();
195
196 match val.get(name) {
197 Some(Value::String(v)) => {
198 let results = self.process(v)?;
199 val.extend(results);
200 }
201 Some(Value::Null) | None => {
202 if !self.ignore_missing {
203 return ProcessorMissingFieldSnafu {
204 processor: self.kind().to_string(),
205 field: name.to_string(),
206 }
207 .fail();
208 }
209 }
210 Some(v) => {
211 return ProcessorExpectStringSnafu {
212 processor: self.kind().to_string(),
213 v: v.clone(),
214 }
215 .fail();
216 }
217 }
218 }
219 Ok(())
220 }
221}
222
223#[cfg(test)]
224mod tests {
225
226 use super::*;
227 use crate::etl::field::Field;
228
229 #[test]
230 fn test_equal_length() {
231 let mut reader = csv::ReaderBuilder::new();
232 reader.has_headers(false);
233 let processor = CsvProcessor {
234 reader,
235 fields: Fields::new(vec![Field::new("data", None)]),
236 target_fields: vec!["a".into(), "b".into()],
237 ..Default::default()
238 };
239
240 let result = processor.process("1,2").unwrap();
241
242 let values: PipelineMap = [
243 ("a".into(), Value::String("1".into())),
244 ("b".into(), Value::String("2".into())),
245 ]
246 .into_iter()
247 .collect();
248
249 assert_eq!(result, values);
250 }
251
252 #[test]
254 fn test_target_fields_has_more_length() {
255 {
257 let mut reader = csv::ReaderBuilder::new();
258 reader.has_headers(false);
259 let processor = CsvProcessor {
260 reader,
261 fields: Fields::new(vec![Field::new("data", None)]),
262 target_fields: vec!["a".into(), "b".into(), "c".into()],
263 ..Default::default()
264 };
265
266 let result = processor.process("1,2").unwrap();
267
268 let values: PipelineMap = [
269 ("a".into(), Value::String("1".into())),
270 ("b".into(), Value::String("2".into())),
271 ("c".into(), Value::Null),
272 ]
273 .into_iter()
274 .collect();
275
276 assert_eq!(result, values);
277 }
278
279 {
281 let mut reader = csv::ReaderBuilder::new();
282 reader.has_headers(false);
283 let processor = CsvProcessor {
284 reader,
285 fields: Fields::new(vec![Field::new("data", None)]),
286 target_fields: vec!["a".into(), "b".into(), "c".into()],
287 empty_value: Some("default".into()),
288 ..Default::default()
289 };
290
291 let result = processor.process("1,2").unwrap();
292
293 let values: PipelineMap = [
294 ("a".into(), Value::String("1".into())),
295 ("b".into(), Value::String("2".into())),
296 ("c".into(), Value::String("default".into())),
297 ]
298 .into_iter()
299 .collect();
300
301 assert_eq!(result, values);
302 }
303 }
304
305 #[test]
307 fn test_target_fields_has_less_length() {
308 let mut reader = csv::ReaderBuilder::new();
309 reader.has_headers(false);
310 let processor = CsvProcessor {
311 reader,
312 target_fields: vec!["a".into(), "b".into()],
313 empty_value: Some("default".into()),
314 ..Default::default()
315 };
316
317 let result = processor.process("1,2").unwrap();
318
319 let values: PipelineMap = [
320 ("a".into(), Value::String("1".into())),
321 ("b".into(), Value::String("2".into())),
322 ]
323 .into_iter()
324 .collect();
325
326 assert_eq!(result, values);
327 }
328}