1use std::collections::BTreeMap;
18
19use csv::{ReaderBuilder, Trim};
20use itertools::EitherOrBoth::{Both, Left, Right};
21use itertools::Itertools;
22use snafu::{OptionExt, ResultExt};
23
24use crate::error::{
25 CsvNoRecordSnafu, CsvQuoteNameSnafu, CsvReadSnafu, CsvSeparatorNameSnafu, Error,
26 KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
27};
28use crate::etl::field::Fields;
29use crate::etl::processor::{
30 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
31 IGNORE_MISSING_NAME,
32};
33use crate::etl::value::Value;
34
35pub(crate) const PROCESSOR_CSV: &str = "csv";
36
37const SEPARATOR_NAME: &str = "separator";
38const QUOTE_NAME: &str = "quote";
39const TRIM_NAME: &str = "trim";
40const EMPTY_VALUE_NAME: &str = "empty_value";
41const TARGET_FIELDS: &str = "target_fields";
42
43#[derive(Debug, Default)]
45pub struct CsvProcessor {
46 reader: ReaderBuilder,
47 fields: Fields,
48
49 ignore_missing: bool,
50
51 empty_value: Option<String>,
53 target_fields: Vec<String>,
54 }
60
61impl CsvProcessor {
62 fn process(&self, val: &str) -> Result<BTreeMap<String, Value>> {
64 let mut reader = self.reader.from_reader(val.as_bytes());
65
66 if let Some(result) = reader.records().next() {
67 let record: csv::StringRecord = result.context(CsvReadSnafu)?;
68
69 let values = self
70 .target_fields
71 .iter()
72 .zip_longest(record.iter())
73 .filter_map(|zipped| match zipped {
74 Both(target_field, val) => {
75 Some((target_field.clone(), Value::String(val.into())))
76 }
77 Left(target_field) => {
79 let value = self
80 .empty_value
81 .as_ref()
82 .map(|s| Value::String(s.clone()))
83 .unwrap_or(Value::Null);
84 Some((target_field.clone(), value))
85 }
86 Right(_) => None,
88 })
89 .collect();
90
91 Ok(values)
92 } else {
93 CsvNoRecordSnafu.fail()
94 }
95 }
96}
97
98impl TryFrom<&yaml_rust::yaml::Hash> for CsvProcessor {
99 type Error = Error;
100
101 fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
102 let mut reader = ReaderBuilder::new();
103 reader.has_headers(false);
104
105 let mut fields = Fields::default();
106 let mut ignore_missing = false;
107 let mut empty_value = None;
108 let mut target_fields = vec![];
109
110 for (k, v) in hash {
111 let key = k
112 .as_str()
113 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
114 match key {
115 FIELD_NAME => {
116 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
117 }
118 FIELDS_NAME => {
119 fields = yaml_new_fields(v, FIELDS_NAME)?;
120 }
121 TARGET_FIELDS => {
122 target_fields = yaml_string(v, TARGET_FIELDS)?
123 .split(',')
124 .map(|s| s.trim().to_string())
125 .filter(|s| !s.is_empty())
126 .collect();
127 }
128 SEPARATOR_NAME => {
129 let separator = yaml_string(v, SEPARATOR_NAME)?;
130 if separator.len() != 1 {
131 return CsvSeparatorNameSnafu {
132 separator: SEPARATOR_NAME,
133 value: separator,
134 }
135 .fail();
136 } else {
137 reader.delimiter(separator.as_bytes()[0]);
138 }
139 }
140 QUOTE_NAME => {
141 let quote = yaml_string(v, QUOTE_NAME)?;
142 if quote.len() != 1 {
143 return CsvQuoteNameSnafu {
144 quote: QUOTE_NAME,
145 value: quote,
146 }
147 .fail();
148 } else {
149 reader.quote(quote.as_bytes()[0]);
150 }
151 }
152 TRIM_NAME => {
153 let trim = yaml_bool(v, TRIM_NAME)?;
154 if trim {
155 reader.trim(Trim::All);
156 } else {
157 reader.trim(Trim::None);
158 }
159 }
160 IGNORE_MISSING_NAME => {
161 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
162 }
163 EMPTY_VALUE_NAME => {
164 empty_value = Some(yaml_string(v, EMPTY_VALUE_NAME)?);
165 }
166
167 _ => {}
168 }
169 }
170 let proc = {
171 CsvProcessor {
172 reader,
173 fields,
174 ignore_missing,
175 empty_value,
176 target_fields,
177 }
178 };
179
180 Ok(proc)
181 }
182}
183
184impl Processor for CsvProcessor {
185 fn kind(&self) -> &str {
186 PROCESSOR_CSV
187 }
188
189 fn ignore_missing(&self) -> bool {
190 self.ignore_missing
191 }
192
193 fn exec_mut(&self, mut val: Value) -> Result<Value> {
194 for field in self.fields.iter() {
195 let name = field.input_field();
196
197 match val.get(name) {
198 Some(Value::String(v)) => {
199 let results = self.process(v)?;
200 val.extend(results.into())?;
201 }
202 Some(Value::Null) | None => {
203 if !self.ignore_missing {
204 return ProcessorMissingFieldSnafu {
205 processor: self.kind().to_string(),
206 field: name.to_string(),
207 }
208 .fail();
209 }
210 }
211 Some(v) => {
212 return ProcessorExpectStringSnafu {
213 processor: self.kind().to_string(),
214 v: v.clone(),
215 }
216 .fail();
217 }
218 }
219 }
220 Ok(val)
221 }
222}
223
224#[cfg(test)]
225mod tests {
226
227 use super::*;
228 use crate::etl::field::Field;
229
230 #[test]
231 fn test_equal_length() {
232 let mut reader = csv::ReaderBuilder::new();
233 reader.has_headers(false);
234 let processor = CsvProcessor {
235 reader,
236 fields: Fields::new(vec![Field::new("data", None)]),
237 target_fields: vec!["a".into(), "b".into()],
238 ..Default::default()
239 };
240
241 let result = processor.process("1,2").unwrap();
242
243 let values: BTreeMap<String, Value> = [
244 ("a".into(), Value::String("1".into())),
245 ("b".into(), Value::String("2".into())),
246 ]
247 .into_iter()
248 .collect();
249
250 assert_eq!(result, values);
251 }
252
253 #[test]
255 fn test_target_fields_has_more_length() {
256 {
258 let mut reader = csv::ReaderBuilder::new();
259 reader.has_headers(false);
260 let processor = CsvProcessor {
261 reader,
262 fields: Fields::new(vec![Field::new("data", None)]),
263 target_fields: vec!["a".into(), "b".into(), "c".into()],
264 ..Default::default()
265 };
266
267 let result = processor.process("1,2").unwrap();
268
269 let values: BTreeMap<String, Value> = [
270 ("a".into(), Value::String("1".into())),
271 ("b".into(), Value::String("2".into())),
272 ("c".into(), Value::Null),
273 ]
274 .into_iter()
275 .collect();
276
277 assert_eq!(result, values);
278 }
279
280 {
282 let mut reader = csv::ReaderBuilder::new();
283 reader.has_headers(false);
284 let processor = CsvProcessor {
285 reader,
286 fields: Fields::new(vec![Field::new("data", None)]),
287 target_fields: vec!["a".into(), "b".into(), "c".into()],
288 empty_value: Some("default".into()),
289 ..Default::default()
290 };
291
292 let result = processor.process("1,2").unwrap();
293
294 let values: BTreeMap<String, Value> = [
295 ("a".into(), Value::String("1".into())),
296 ("b".into(), Value::String("2".into())),
297 ("c".into(), Value::String("default".into())),
298 ]
299 .into_iter()
300 .collect();
301
302 assert_eq!(result, values);
303 }
304 }
305
306 #[test]
308 fn test_target_fields_has_less_length() {
309 let mut reader = csv::ReaderBuilder::new();
310 reader.has_headers(false);
311 let processor = CsvProcessor {
312 reader,
313 target_fields: vec!["a".into(), "b".into()],
314 empty_value: Some("default".into()),
315 ..Default::default()
316 };
317
318 let result = processor.process("1,2").unwrap();
319
320 let values: BTreeMap<String, Value> = [
321 ("a".into(), Value::String("1".into())),
322 ("b".into(), Value::String("2".into())),
323 ]
324 .into_iter()
325 .collect();
326
327 assert_eq!(result, values);
328 }
329}