1use std::sync::Arc;
16
17use chrono::{DateTime, NaiveDateTime};
18use chrono_tz::Tz;
19use lazy_static::lazy_static;
20use snafu::{OptionExt, ResultExt};
21
22use crate::error::{
23 DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateParseSnafu,
24 DateParseTimezoneSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
25 ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu, Result,
26};
27use crate::etl::field::Fields;
28use crate::etl::processor::{
29 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, FIELDS_NAME,
30 FIELD_NAME, IGNORE_MISSING_NAME,
31};
32use crate::etl::value::{Timestamp, Value};
33
34pub(crate) const PROCESSOR_DATE: &str = "date";
35
36const FORMATS_NAME: &str = "formats"; const TIMEZONE_NAME: &str = "timezone"; const LOCALE_NAME: &str = "locale";
39const OUTPUT_FORMAT_NAME: &str = "output_format"; lazy_static! {
42 static ref DEFAULT_FORMATS: Vec<Arc<String>> = vec![
43 "%Y-%m-%dT%H:%M:%S%:z",
45 "%Y-%m-%dT%H:%M:%S%.3f%:z",
46 "%Y-%m-%dT%H:%M:%S%.6f%:z",
47 "%Y-%m-%dT%H:%M:%S%.9f%:z",
48 "%Y-%m-%dT%H:%M:%S%z",
50 "%Y-%m-%dT%H:%M:%S%.3f%z",
51 "%Y-%m-%dT%H:%M:%S%.6f%z",
52 "%Y-%m-%dT%H:%M:%S%.9f%z",
53 "%Y-%m-%dT%H:%M:%SZ",
55 "%Y-%m-%dT%H:%M:%S",
56 "%Y-%m-%dT%H:%M:%S%.3f",
57 "%Y-%m-%dT%H:%M:%S%.6f",
58 "%Y-%m-%dT%H:%M:%S%.9f",
59 ]
60 .iter()
61 .map(|s| Arc::new(s.to_string()))
62 .collect();
63}
64
65#[derive(Debug)]
66struct Formats(Vec<Arc<String>>);
67
68impl Default for Formats {
69 fn default() -> Self {
70 Formats(DEFAULT_FORMATS.clone())
71 }
72}
73
74impl Formats {
75 fn new(mut formats: Vec<Arc<String>>) -> Self {
76 formats.sort();
77 formats.dedup();
78 Formats(formats)
79 }
80}
81
82impl std::ops::Deref for Formats {
83 type Target = Vec<Arc<String>>;
84
85 fn deref(&self) -> &Self::Target {
86 &self.0
87 }
88}
89
90impl TryFrom<&yaml_rust::yaml::Hash> for DateProcessor {
91 type Error = Error;
92
93 fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
94 let mut fields = Fields::default();
95 let mut formats = Formats::default();
96 let mut timezone = None;
97 let mut locale = None;
98 let mut ignore_missing = false;
99
100 for (k, v) in hash {
101 let key = k
102 .as_str()
103 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
104
105 match key {
106 FIELD_NAME => {
107 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
108 }
109 FIELDS_NAME => {
110 fields = yaml_new_fields(v, FIELDS_NAME)?;
111 }
112
113 FORMATS_NAME => {
114 let format_strs = yaml_strings(v, FORMATS_NAME)?;
115 if format_strs.is_empty() {
116 formats = Formats::new(DEFAULT_FORMATS.clone());
117 } else {
118 formats = Formats::new(format_strs.into_iter().map(Arc::new).collect());
119 }
120 }
121 TIMEZONE_NAME => {
122 timezone = Some(Arc::new(yaml_string(v, TIMEZONE_NAME)?));
123 }
124 LOCALE_NAME => {
125 locale = Some(Arc::new(yaml_string(v, LOCALE_NAME)?));
126 }
127 IGNORE_MISSING_NAME => {
128 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
129 }
130
131 _ => {}
132 }
133 }
134
135 let builder = DateProcessor {
136 fields,
137 formats,
138 timezone,
139 locale,
140 ignore_missing,
141 };
142
143 Ok(builder)
144 }
145}
146
147#[derive(Debug, Default)]
150pub struct DateProcessor {
151 pub(crate) fields: Fields,
152 formats: Formats,
153 timezone: Option<Arc<String>>,
154 locale: Option<Arc<String>>, ignore_missing: bool,
157 }
163
164impl DateProcessor {
165 fn parse(&self, val: &str) -> Result<Timestamp> {
166 let mut tz = Tz::UTC;
167 if let Some(timezone) = &self.timezone {
168 tz = timezone.parse::<Tz>().context(DateParseTimezoneSnafu {
169 value: timezone.as_ref(),
170 })?;
171 }
172
173 for fmt in self.formats.iter() {
174 if let Ok(ns) = try_parse(val, fmt, tz) {
175 return Ok(Timestamp::Nanosecond(ns));
176 }
177 }
178
179 ProcessorFailedToParseStringSnafu {
180 kind: PROCESSOR_DATE.to_string(),
181 value: val.to_string(),
182 }
183 .fail()
184 }
185}
186
187impl Processor for DateProcessor {
188 fn kind(&self) -> &str {
189 PROCESSOR_DATE
190 }
191
192 fn ignore_missing(&self) -> bool {
193 self.ignore_missing
194 }
195
196 fn exec_mut(&self, mut val: Value) -> Result<Value> {
197 for field in self.fields.iter() {
198 let index = field.input_field();
199 match val.get(index) {
200 Some(Value::String(s)) => {
201 let timestamp = self.parse(s)?;
202 let output_key = field.target_or_input_field();
203 val.insert(output_key.to_string(), Value::Timestamp(timestamp))?;
204 }
205 Some(Value::Null) | None => {
206 if !self.ignore_missing {
207 return ProcessorMissingFieldSnafu {
208 processor: self.kind().to_string(),
209 field: field.input_field().to_string(),
210 }
211 .fail();
212 }
213 }
214 Some(v) => {
215 return ProcessorExpectStringSnafu {
216 processor: self.kind().to_string(),
217 v: v.clone(),
218 }
219 .fail();
220 }
221 }
222 }
223 Ok(val)
224 }
225}
226
227fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result<i64> {
229 if let Ok(dt) = DateTime::parse_from_str(val, fmt) {
230 Ok(dt
231 .timestamp_nanos_opt()
232 .context(DateFailedToGetTimestampSnafu)?)
233 } else {
234 let dt = NaiveDateTime::parse_from_str(val, fmt)
235 .context(DateParseSnafu { value: val })?
236 .and_local_timezone(tz)
237 .single()
238 .context(DateFailedToGetLocalTimezoneSnafu)?;
239 Ok(dt
240 .timestamp_nanos_opt()
241 .context(DateFailedToGetTimestampSnafu)?)
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use std::sync::Arc;
248
249 use chrono_tz::Asia::Tokyo;
250
251 use crate::etl::processor::date::{try_parse, DateProcessor};
252
253 #[test]
254 fn test_try_parse() {
255 let time_with_tz = "2014-5-17T04:34:56+00:00";
256 let fmt_with_tz = "%Y-%m-%dT%H:%M:%S%:z";
257
258 let time_without_tz = "2014-5-17T13:34:56";
259 let fmt_without_tz = "%Y-%m-%dT%H:%M:%S";
260
261 let tz = Tokyo;
262
263 let parsed_with_tz = try_parse(time_with_tz, fmt_with_tz, tz);
264 assert!(parsed_with_tz.is_ok());
265
266 let parsed_without_tz = try_parse(time_without_tz, fmt_without_tz, tz);
267 assert!(parsed_without_tz.is_ok());
268
269 assert_eq!(parsed_with_tz.unwrap(), parsed_without_tz.unwrap());
270 }
271
272 #[test]
273 fn test_parse() {
274 let processor = DateProcessor::default();
275
276 let values: Vec<&str> = vec![
277 "2014-5-17T12:34:56",
278 "2014-5-17T12:34:56Z",
279 "2014-5-17T12:34:56+09:30",
280 "2014-5-17T12:34:56.000+09:30",
281 "2014-5-17T12:34:56-0930",
282 "2014-5-17T12:34:56.000-0930",
283 ]
284 .into_iter()
285 .collect();
286
287 for value in values {
288 let parsed = processor.parse(value);
289 assert!(parsed.is_ok());
290 }
291 }
292
293 #[test]
294 fn test_parse_with_formats() {
295 let formats = vec![
296 "%Y-%m-%dT%H:%M:%S%:z",
297 "%Y-%m-%dT%H:%M:%S%.3f%:z",
298 "%Y-%m-%dT%H:%M:%S",
299 "%Y-%m-%dT%H:%M:%SZ",
300 ]
301 .into_iter()
302 .map(|s| Arc::new(s.to_string()))
303 .collect::<Vec<_>>();
304 let processor = DateProcessor {
305 formats: super::Formats(formats),
306 ..Default::default()
307 };
308
309 let values: Vec<&str> = vec![
310 "2014-5-17T12:34:56",
311 "2014-5-17T12:34:56Z",
312 "2014-5-17T12:34:56+09:30",
313 "2014-5-17T12:34:56.000+09:30",
314 "2014-5-17T12:34:56-0930",
315 "2014-5-17T12:34:56.000-0930",
316 ]
317 .into_iter()
318 .collect();
319
320 for value in values {
321 let parsed = processor.parse(value);
322 assert!(parsed.is_ok());
323 }
324 }
325
326 #[test]
327 fn test_parse_with_timezone() {
328 let processor = DateProcessor {
329 timezone: Some(Arc::new("Asia/Tokyo".to_string())),
330 ..Default::default()
331 };
332
333 let values: Vec<&str> = vec![
334 "2014-5-17T12:34:56",
335 "2014-5-17T12:34:56Z",
336 "2014-5-17T12:34:56+09:30",
337 "2014-5-17T12:34:56.000+09:30",
338 "2014-5-17T12:34:56-0930",
339 "2014-5-17T12:34:56.000-0930",
340 ]
341 .into_iter()
342 .collect();
343
344 for value in values {
345 let parsed = processor.parse(value);
346 assert!(parsed.is_ok());
347 }
348 }
349}