1use std::sync::Arc;
16
17use chrono::{DateTime, NaiveDateTime};
18use chrono_tz::Tz;
19use lazy_static::lazy_static;
20use snafu::{OptionExt, ResultExt};
21
22use crate::error::{
23 DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateParseSnafu,
24 DateParseTimezoneSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
25 ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu, Result,
26};
27use crate::etl::field::Fields;
28use crate::etl::processor::{
29 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, FIELDS_NAME,
30 FIELD_NAME, IGNORE_MISSING_NAME,
31};
32use crate::etl::value::{Timestamp, Value};
33use crate::etl::PipelineMap;
34
35pub(crate) const PROCESSOR_DATE: &str = "date";
36
37const FORMATS_NAME: &str = "formats"; const TIMEZONE_NAME: &str = "timezone"; const LOCALE_NAME: &str = "locale";
40const OUTPUT_FORMAT_NAME: &str = "output_format"; lazy_static! {
43 static ref DEFAULT_FORMATS: Vec<Arc<String>> = vec![
44 "%Y-%m-%dT%H:%M:%S%:z",
46 "%Y-%m-%dT%H:%M:%S%.3f%:z",
47 "%Y-%m-%dT%H:%M:%S%.6f%:z",
48 "%Y-%m-%dT%H:%M:%S%.9f%:z",
49 "%Y-%m-%dT%H:%M:%S%z",
51 "%Y-%m-%dT%H:%M:%S%.3f%z",
52 "%Y-%m-%dT%H:%M:%S%.6f%z",
53 "%Y-%m-%dT%H:%M:%S%.9f%z",
54 "%Y-%m-%dT%H:%M:%SZ",
56 "%Y-%m-%dT%H:%M:%S",
57 "%Y-%m-%dT%H:%M:%S%.3f",
58 "%Y-%m-%dT%H:%M:%S%.6f",
59 "%Y-%m-%dT%H:%M:%S%.9f",
60 ]
61 .iter()
62 .map(|s| Arc::new(s.to_string()))
63 .collect();
64}
65
66#[derive(Debug)]
67struct Formats(Vec<Arc<String>>);
68
69impl Default for Formats {
70 fn default() -> Self {
71 Formats(DEFAULT_FORMATS.clone())
72 }
73}
74
75impl Formats {
76 fn new(mut formats: Vec<Arc<String>>) -> Self {
77 formats.sort();
78 formats.dedup();
79 Formats(formats)
80 }
81}
82
83impl std::ops::Deref for Formats {
84 type Target = Vec<Arc<String>>;
85
86 fn deref(&self) -> &Self::Target {
87 &self.0
88 }
89}
90
91impl TryFrom<&yaml_rust::yaml::Hash> for DateProcessor {
92 type Error = Error;
93
94 fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
95 let mut fields = Fields::default();
96 let mut formats = Formats::default();
97 let mut timezone = None;
98 let mut locale = None;
99 let mut ignore_missing = false;
100
101 for (k, v) in hash {
102 let key = k
103 .as_str()
104 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
105
106 match key {
107 FIELD_NAME => {
108 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
109 }
110 FIELDS_NAME => {
111 fields = yaml_new_fields(v, FIELDS_NAME)?;
112 }
113
114 FORMATS_NAME => {
115 let format_strs = yaml_strings(v, FORMATS_NAME)?;
116 if format_strs.is_empty() {
117 formats = Formats::new(DEFAULT_FORMATS.clone());
118 } else {
119 formats = Formats::new(format_strs.into_iter().map(Arc::new).collect());
120 }
121 }
122 TIMEZONE_NAME => {
123 timezone = Some(Arc::new(yaml_string(v, TIMEZONE_NAME)?));
124 }
125 LOCALE_NAME => {
126 locale = Some(Arc::new(yaml_string(v, LOCALE_NAME)?));
127 }
128 IGNORE_MISSING_NAME => {
129 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
130 }
131
132 _ => {}
133 }
134 }
135
136 let builder = DateProcessor {
137 fields,
138 formats,
139 timezone,
140 locale,
141 ignore_missing,
142 };
143
144 Ok(builder)
145 }
146}
147
148#[derive(Debug, Default)]
151pub struct DateProcessor {
152 fields: Fields,
153 formats: Formats,
154 timezone: Option<Arc<String>>,
155 locale: Option<Arc<String>>, ignore_missing: bool,
158 }
164
165impl DateProcessor {
166 fn parse(&self, val: &str) -> Result<Timestamp> {
167 let mut tz = Tz::UTC;
168 if let Some(timezone) = &self.timezone {
169 tz = timezone.parse::<Tz>().context(DateParseTimezoneSnafu {
170 value: timezone.as_ref(),
171 })?;
172 }
173
174 for fmt in self.formats.iter() {
175 if let Ok(ns) = try_parse(val, fmt, tz) {
176 return Ok(Timestamp::Nanosecond(ns));
177 }
178 }
179
180 ProcessorFailedToParseStringSnafu {
181 kind: PROCESSOR_DATE.to_string(),
182 value: val.to_string(),
183 }
184 .fail()
185 }
186}
187
188impl Processor for DateProcessor {
189 fn kind(&self) -> &str {
190 PROCESSOR_DATE
191 }
192
193 fn ignore_missing(&self) -> bool {
194 self.ignore_missing
195 }
196
197 fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
198 for field in self.fields.iter() {
199 let index = field.input_field();
200 match val.get(index) {
201 Some(Value::String(s)) => {
202 let timestamp = self.parse(s)?;
203 let output_key = field.target_or_input_field();
204 val.insert(output_key.to_string(), Value::Timestamp(timestamp));
205 }
206 Some(Value::Null) | None => {
207 if !self.ignore_missing {
208 return ProcessorMissingFieldSnafu {
209 processor: self.kind().to_string(),
210 field: field.input_field().to_string(),
211 }
212 .fail();
213 }
214 }
215 Some(v) => {
216 return ProcessorExpectStringSnafu {
217 processor: self.kind().to_string(),
218 v: v.clone(),
219 }
220 .fail();
221 }
222 }
223 }
224 Ok(())
225 }
226}
227
228fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result<i64> {
230 if let Ok(dt) = DateTime::parse_from_str(val, fmt) {
231 Ok(dt
232 .timestamp_nanos_opt()
233 .context(DateFailedToGetTimestampSnafu)?)
234 } else {
235 let dt = NaiveDateTime::parse_from_str(val, fmt)
236 .context(DateParseSnafu { value: val })?
237 .and_local_timezone(tz)
238 .single()
239 .context(DateFailedToGetLocalTimezoneSnafu)?;
240 Ok(dt
241 .timestamp_nanos_opt()
242 .context(DateFailedToGetTimestampSnafu)?)
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use std::sync::Arc;
249
250 use chrono_tz::Asia::Tokyo;
251
252 use crate::etl::processor::date::{try_parse, DateProcessor};
253
254 #[test]
255 fn test_try_parse() {
256 let time_with_tz = "2014-5-17T04:34:56+00:00";
257 let fmt_with_tz = "%Y-%m-%dT%H:%M:%S%:z";
258
259 let time_without_tz = "2014-5-17T13:34:56";
260 let fmt_without_tz = "%Y-%m-%dT%H:%M:%S";
261
262 let tz = Tokyo;
263
264 let parsed_with_tz = try_parse(time_with_tz, fmt_with_tz, tz);
265 assert!(parsed_with_tz.is_ok());
266
267 let parsed_without_tz = try_parse(time_without_tz, fmt_without_tz, tz);
268 assert!(parsed_without_tz.is_ok());
269
270 assert_eq!(parsed_with_tz.unwrap(), parsed_without_tz.unwrap());
271 }
272
273 #[test]
274 fn test_parse() {
275 let processor = DateProcessor::default();
276
277 let values: Vec<&str> = vec![
278 "2014-5-17T12:34:56",
279 "2014-5-17T12:34:56Z",
280 "2014-5-17T12:34:56+09:30",
281 "2014-5-17T12:34:56.000+09:30",
282 "2014-5-17T12:34:56-0930",
283 "2014-5-17T12:34:56.000-0930",
284 ]
285 .into_iter()
286 .collect();
287
288 for value in values {
289 let parsed = processor.parse(value);
290 assert!(parsed.is_ok());
291 }
292 }
293
294 #[test]
295 fn test_parse_with_formats() {
296 let formats = vec![
297 "%Y-%m-%dT%H:%M:%S%:z",
298 "%Y-%m-%dT%H:%M:%S%.3f%:z",
299 "%Y-%m-%dT%H:%M:%S",
300 "%Y-%m-%dT%H:%M:%SZ",
301 ]
302 .into_iter()
303 .map(|s| Arc::new(s.to_string()))
304 .collect::<Vec<_>>();
305 let processor = DateProcessor {
306 formats: super::Formats(formats),
307 ..Default::default()
308 };
309
310 let values: Vec<&str> = vec![
311 "2014-5-17T12:34:56",
312 "2014-5-17T12:34:56Z",
313 "2014-5-17T12:34:56+09:30",
314 "2014-5-17T12:34:56.000+09:30",
315 "2014-5-17T12:34:56-0930",
316 "2014-5-17T12:34:56.000-0930",
317 ]
318 .into_iter()
319 .collect();
320
321 for value in values {
322 let parsed = processor.parse(value);
323 assert!(parsed.is_ok());
324 }
325 }
326
327 #[test]
328 fn test_parse_with_timezone() {
329 let processor = DateProcessor {
330 timezone: Some(Arc::new("Asia/Tokyo".to_string())),
331 ..Default::default()
332 };
333
334 let values: Vec<&str> = vec![
335 "2014-5-17T12:34:56",
336 "2014-5-17T12:34:56Z",
337 "2014-5-17T12:34:56+09:30",
338 "2014-5-17T12:34:56.000+09:30",
339 "2014-5-17T12:34:56-0930",
340 "2014-5-17T12:34:56.000-0930",
341 ]
342 .into_iter()
343 .collect();
344
345 for value in values {
346 let parsed = processor.parse(value);
347 assert!(parsed.is_ok());
348 }
349 }
350}