1use std::sync::Arc;
16
17use chrono::{DateTime, NaiveDateTime};
18use chrono_tz::Tz;
19use lazy_static::lazy_static;
20use snafu::{OptionExt, ResultExt};
21
22use crate::error::{
23 DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateParseSnafu,
24 DateParseTimezoneSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
25 ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu, Result,
26};
27use crate::etl::field::Fields;
28use crate::etl::processor::{
29 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, FIELDS_NAME,
30 FIELD_NAME, IGNORE_MISSING_NAME,
31};
32use crate::etl::value::{Timestamp, Value};
33use crate::etl::PipelineMap;
34
35pub(crate) const PROCESSOR_DATE: &str = "date";
36
37const FORMATS_NAME: &str = "formats"; const TIMEZONE_NAME: &str = "timezone"; const LOCALE_NAME: &str = "locale";
40const OUTPUT_FORMAT_NAME: &str = "output_format"; lazy_static! {
43 static ref DEFAULT_FORMATS: Vec<Arc<String>> = vec![
44 "%Y-%m-%dT%H:%M:%S%:z",
46 "%Y-%m-%dT%H:%M:%S%.3f%:z",
47 "%Y-%m-%dT%H:%M:%S%.6f%:z",
48 "%Y-%m-%dT%H:%M:%S%.9f%:z",
49 "%Y-%m-%dT%H:%M:%S%z",
51 "%Y-%m-%dT%H:%M:%S%.3f%z",
52 "%Y-%m-%dT%H:%M:%S%.6f%z",
53 "%Y-%m-%dT%H:%M:%S%.9f%z",
54 "%Y-%m-%dT%H:%M:%SZ",
56 "%Y-%m-%dT%H:%M:%S",
57 "%Y-%m-%dT%H:%M:%S%.3f",
58 "%Y-%m-%dT%H:%M:%S%.6f",
59 "%Y-%m-%dT%H:%M:%S%.9f",
60 ]
61 .iter()
62 .map(|s| Arc::new(s.to_string()))
63 .collect();
64}
65
66#[derive(Debug)]
67struct Formats(Vec<Arc<String>>);
68
69impl Default for Formats {
70 fn default() -> Self {
71 Formats(DEFAULT_FORMATS.clone())
72 }
73}
74
75impl Formats {
76 fn new(mut formats: Vec<Arc<String>>) -> Self {
77 formats.sort();
78 formats.dedup();
79 Formats(formats)
80 }
81}
82
83impl std::ops::Deref for Formats {
84 type Target = Vec<Arc<String>>;
85
86 fn deref(&self) -> &Self::Target {
87 &self.0
88 }
89}
90
91impl TryFrom<&yaml_rust::yaml::Hash> for DateProcessor {
92 type Error = Error;
93
94 fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
95 let mut fields = Fields::default();
96 let mut formats = Formats::default();
97 let mut timezone = None;
98 let mut locale = None;
99 let mut ignore_missing = false;
100
101 for (k, v) in hash {
102 let key = k
103 .as_str()
104 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
105
106 match key {
107 FIELD_NAME => {
108 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
109 }
110 FIELDS_NAME => {
111 fields = yaml_new_fields(v, FIELDS_NAME)?;
112 }
113
114 FORMATS_NAME => {
115 let format_strs = yaml_strings(v, FORMATS_NAME)?;
116 if format_strs.is_empty() {
117 formats = Formats::new(DEFAULT_FORMATS.clone());
118 } else {
119 formats = Formats::new(format_strs.into_iter().map(Arc::new).collect());
120 }
121 }
122 TIMEZONE_NAME => {
123 timezone = Some(Arc::new(yaml_string(v, TIMEZONE_NAME)?));
124 }
125 LOCALE_NAME => {
126 locale = Some(Arc::new(yaml_string(v, LOCALE_NAME)?));
127 }
128 IGNORE_MISSING_NAME => {
129 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
130 }
131
132 _ => {}
133 }
134 }
135
136 let builder = DateProcessor {
137 fields,
138 formats,
139 timezone,
140 locale,
141 ignore_missing,
142 };
143
144 Ok(builder)
145 }
146}
147
148#[derive(Debug, Default)]
151pub struct DateProcessor {
152 fields: Fields,
153 formats: Formats,
154 timezone: Option<Arc<String>>,
155 locale: Option<Arc<String>>, ignore_missing: bool,
158 }
164
165impl DateProcessor {
166 pub(crate) fn target_count(&self) -> usize {
167 self.fields.len()
168 }
169
170 fn parse(&self, val: &str) -> Result<Timestamp> {
171 let mut tz = Tz::UTC;
172 if let Some(timezone) = &self.timezone {
173 tz = timezone.parse::<Tz>().context(DateParseTimezoneSnafu {
174 value: timezone.as_ref(),
175 })?;
176 }
177
178 for fmt in self.formats.iter() {
179 if let Ok(ns) = try_parse(val, fmt, tz) {
180 return Ok(Timestamp::Nanosecond(ns));
181 }
182 }
183
184 ProcessorFailedToParseStringSnafu {
185 kind: PROCESSOR_DATE.to_string(),
186 value: val.to_string(),
187 }
188 .fail()
189 }
190}
191
192impl Processor for DateProcessor {
193 fn kind(&self) -> &str {
194 PROCESSOR_DATE
195 }
196
197 fn ignore_missing(&self) -> bool {
198 self.ignore_missing
199 }
200
201 fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
202 for field in self.fields.iter() {
203 let index = field.input_field();
204 match val.get(index) {
205 Some(Value::String(s)) => {
206 let timestamp = self.parse(s)?;
207 let output_key = field.target_or_input_field();
208 val.insert(output_key.to_string(), Value::Timestamp(timestamp));
209 }
210 Some(Value::Null) | None => {
211 if !self.ignore_missing {
212 return ProcessorMissingFieldSnafu {
213 processor: self.kind().to_string(),
214 field: field.input_field().to_string(),
215 }
216 .fail();
217 }
218 }
219 Some(v) => {
220 return ProcessorExpectStringSnafu {
221 processor: self.kind().to_string(),
222 v: v.clone(),
223 }
224 .fail();
225 }
226 }
227 }
228 Ok(val)
229 }
230}
231
232fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result<i64> {
234 if let Ok(dt) = DateTime::parse_from_str(val, fmt) {
235 Ok(dt
236 .timestamp_nanos_opt()
237 .context(DateFailedToGetTimestampSnafu)?)
238 } else {
239 let dt = NaiveDateTime::parse_from_str(val, fmt)
240 .context(DateParseSnafu { value: val })?
241 .and_local_timezone(tz)
242 .single()
243 .context(DateFailedToGetLocalTimezoneSnafu)?;
244 Ok(dt
245 .timestamp_nanos_opt()
246 .context(DateFailedToGetTimestampSnafu)?)
247 }
248}
249
250#[cfg(test)]
251mod tests {
252 use std::sync::Arc;
253
254 use chrono_tz::Asia::Tokyo;
255
256 use crate::etl::processor::date::{try_parse, DateProcessor};
257
258 #[test]
259 fn test_try_parse() {
260 let time_with_tz = "2014-5-17T04:34:56+00:00";
261 let fmt_with_tz = "%Y-%m-%dT%H:%M:%S%:z";
262
263 let time_without_tz = "2014-5-17T13:34:56";
264 let fmt_without_tz = "%Y-%m-%dT%H:%M:%S";
265
266 let tz = Tokyo;
267
268 let parsed_with_tz = try_parse(time_with_tz, fmt_with_tz, tz);
269 assert!(parsed_with_tz.is_ok());
270
271 let parsed_without_tz = try_parse(time_without_tz, fmt_without_tz, tz);
272 assert!(parsed_without_tz.is_ok());
273
274 assert_eq!(parsed_with_tz.unwrap(), parsed_without_tz.unwrap());
275 }
276
277 #[test]
278 fn test_parse() {
279 let processor = DateProcessor::default();
280
281 let values: Vec<&str> = vec![
282 "2014-5-17T12:34:56",
283 "2014-5-17T12:34:56Z",
284 "2014-5-17T12:34:56+09:30",
285 "2014-5-17T12:34:56.000+09:30",
286 "2014-5-17T12:34:56-0930",
287 "2014-5-17T12:34:56.000-0930",
288 ]
289 .into_iter()
290 .collect();
291
292 for value in values {
293 let parsed = processor.parse(value);
294 assert!(parsed.is_ok());
295 }
296 }
297
298 #[test]
299 fn test_parse_with_formats() {
300 let formats = vec![
301 "%Y-%m-%dT%H:%M:%S%:z",
302 "%Y-%m-%dT%H:%M:%S%.3f%:z",
303 "%Y-%m-%dT%H:%M:%S",
304 "%Y-%m-%dT%H:%M:%SZ",
305 ]
306 .into_iter()
307 .map(|s| Arc::new(s.to_string()))
308 .collect::<Vec<_>>();
309 let processor = DateProcessor {
310 formats: super::Formats(formats),
311 ..Default::default()
312 };
313
314 let values: Vec<&str> = vec![
315 "2014-5-17T12:34:56",
316 "2014-5-17T12:34:56Z",
317 "2014-5-17T12:34:56+09:30",
318 "2014-5-17T12:34:56.000+09:30",
319 "2014-5-17T12:34:56-0930",
320 "2014-5-17T12:34:56.000-0930",
321 ]
322 .into_iter()
323 .collect();
324
325 for value in values {
326 let parsed = processor.parse(value);
327 assert!(parsed.is_ok());
328 }
329 }
330
331 #[test]
332 fn test_parse_with_timezone() {
333 let processor = DateProcessor {
334 timezone: Some(Arc::new("Asia/Tokyo".to_string())),
335 ..Default::default()
336 };
337
338 let values: Vec<&str> = vec![
339 "2014-5-17T12:34:56",
340 "2014-5-17T12:34:56Z",
341 "2014-5-17T12:34:56+09:30",
342 "2014-5-17T12:34:56.000+09:30",
343 "2014-5-17T12:34:56-0930",
344 "2014-5-17T12:34:56.000-0930",
345 ]
346 .into_iter()
347 .collect();
348
349 for value in values {
350 let parsed = processor.parse(value);
351 assert!(parsed.is_ok());
352 }
353 }
354}