1use std::sync::Arc;
16
17use chrono::{DateTime, NaiveDateTime, Utc};
18use chrono_tz::Tz;
19use lazy_static::lazy_static;
20use snafu::{OptionExt, ResultExt};
21use vrl::value::{KeyString, Value as VrlValue};
22
23use crate::error::{
24 DateFailedToGetLocalTimezoneSnafu, DateParseSnafu, DateParseTimezoneSnafu, Error,
25 KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorFailedToParseStringSnafu,
26 ProcessorMissingFieldSnafu, Result, ValueMustBeMapSnafu,
27};
28use crate::etl::field::Fields;
29use crate::etl::processor::{
30 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, FIELDS_NAME,
31 FIELD_NAME, IGNORE_MISSING_NAME,
32};
33
34pub(crate) const PROCESSOR_DATE: &str = "date";
35
36const FORMATS_NAME: &str = "formats"; const TIMEZONE_NAME: &str = "timezone"; const LOCALE_NAME: &str = "locale";
39const OUTPUT_FORMAT_NAME: &str = "output_format"; lazy_static! {
42 static ref DEFAULT_FORMATS: Vec<Arc<String>> = vec![
43 "%Y-%m-%dT%H:%M:%S%:z",
45 "%Y-%m-%dT%H:%M:%S%.3f%:z",
46 "%Y-%m-%dT%H:%M:%S%.6f%:z",
47 "%Y-%m-%dT%H:%M:%S%.9f%:z",
48 "%Y-%m-%dT%H:%M:%S%z",
50 "%Y-%m-%dT%H:%M:%S%.3f%z",
51 "%Y-%m-%dT%H:%M:%S%.6f%z",
52 "%Y-%m-%dT%H:%M:%S%.9f%z",
53 "%Y-%m-%dT%H:%M:%SZ",
55 "%Y-%m-%dT%H:%M:%S",
56 "%Y-%m-%dT%H:%M:%S%.3f",
57 "%Y-%m-%dT%H:%M:%S%.6f",
58 "%Y-%m-%dT%H:%M:%S%.9f",
59 ]
60 .iter()
61 .map(|s| Arc::new(s.to_string()))
62 .collect();
63}
64
65#[derive(Debug)]
66struct Formats(Vec<Arc<String>>);
67
68impl Default for Formats {
69 fn default() -> Self {
70 Formats(DEFAULT_FORMATS.clone())
71 }
72}
73
74impl Formats {
75 fn new(mut formats: Vec<Arc<String>>) -> Self {
76 formats.sort();
77 formats.dedup();
78 Formats(formats)
79 }
80}
81
82impl std::ops::Deref for Formats {
83 type Target = Vec<Arc<String>>;
84
85 fn deref(&self) -> &Self::Target {
86 &self.0
87 }
88}
89
90impl TryFrom<&yaml_rust::yaml::Hash> for DateProcessor {
91 type Error = Error;
92
93 fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
94 let mut fields = Fields::default();
95 let mut formats = Formats::default();
96 let mut timezone = None;
97 let mut locale = None;
98 let mut ignore_missing = false;
99
100 for (k, v) in hash {
101 let key = k
102 .as_str()
103 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
104
105 match key {
106 FIELD_NAME => {
107 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
108 }
109 FIELDS_NAME => {
110 fields = yaml_new_fields(v, FIELDS_NAME)?;
111 }
112
113 FORMATS_NAME => {
114 let format_strs = yaml_strings(v, FORMATS_NAME)?;
115 if format_strs.is_empty() {
116 formats = Formats::new(DEFAULT_FORMATS.clone());
117 } else {
118 formats = Formats::new(format_strs.into_iter().map(Arc::new).collect());
119 }
120 }
121 TIMEZONE_NAME => {
122 timezone = Some(Arc::new(yaml_string(v, TIMEZONE_NAME)?));
123 }
124 LOCALE_NAME => {
125 locale = Some(Arc::new(yaml_string(v, LOCALE_NAME)?));
126 }
127 IGNORE_MISSING_NAME => {
128 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
129 }
130
131 _ => {}
132 }
133 }
134
135 let builder = DateProcessor {
136 fields,
137 formats,
138 timezone,
139 locale,
140 ignore_missing,
141 };
142
143 Ok(builder)
144 }
145}
146
147#[derive(Debug, Default)]
150pub struct DateProcessor {
151 pub(crate) fields: Fields,
152 formats: Formats,
153 timezone: Option<Arc<String>>,
154 locale: Option<Arc<String>>, ignore_missing: bool,
157 }
163
164impl DateProcessor {
165 fn parse(&self, val: &str) -> Result<DateTime<Utc>> {
166 let mut tz = Tz::UTC;
167 if let Some(timezone) = &self.timezone {
168 tz = timezone.parse::<Tz>().context(DateParseTimezoneSnafu {
169 value: timezone.as_ref(),
170 })?;
171 }
172
173 for fmt in self.formats.iter() {
174 if let Ok(utc_ts) = try_parse(val, fmt, tz) {
175 return Ok(utc_ts);
176 }
177 }
178
179 ProcessorFailedToParseStringSnafu {
180 kind: PROCESSOR_DATE.to_string(),
181 value: val.to_string(),
182 }
183 .fail()
184 }
185}
186
187impl Processor for DateProcessor {
188 fn kind(&self) -> &str {
189 PROCESSOR_DATE
190 }
191
192 fn ignore_missing(&self) -> bool {
193 self.ignore_missing
194 }
195
196 fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
197 for field in self.fields.iter() {
198 let index = field.input_field();
199
200 let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
201
202 match val.get(index) {
203 Some(VrlValue::Bytes(s)) => {
204 let timestamp = self.parse(String::from_utf8_lossy(s).as_ref())?;
205 let output_key = field.target_or_input_field();
206 val.insert(KeyString::from(output_key), VrlValue::Timestamp(timestamp));
207 }
208 Some(VrlValue::Null) | None => {
209 if !self.ignore_missing {
210 return ProcessorMissingFieldSnafu {
211 processor: self.kind().to_string(),
212 field: field.input_field().to_string(),
213 }
214 .fail();
215 }
216 }
217 Some(v) => {
218 return ProcessorExpectStringSnafu {
219 processor: self.kind().to_string(),
220 v: v.clone(),
221 }
222 .fail();
223 }
224 }
225 }
226 Ok(val)
227 }
228}
229
230fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result<DateTime<Utc>> {
234 if let Ok(dt) = DateTime::parse_from_str(val, fmt) {
235 Ok(dt.to_utc())
236 } else {
237 let dt = NaiveDateTime::parse_from_str(val, fmt)
238 .context(DateParseSnafu { value: val })?
239 .and_local_timezone(tz)
240 .single()
241 .context(DateFailedToGetLocalTimezoneSnafu)?;
242 Ok(dt.to_utc())
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use std::sync::Arc;
249
250 use chrono_tz::Asia::Tokyo;
251
252 use crate::etl::processor::date::{try_parse, DateProcessor};
253
254 #[test]
255 fn test_try_parse() {
256 let time_with_tz = "2014-5-17T04:34:56+00:00";
257 let fmt_with_tz = "%Y-%m-%dT%H:%M:%S%:z";
258
259 let time_without_tz = "2014-5-17T13:34:56";
260 let fmt_without_tz = "%Y-%m-%dT%H:%M:%S";
261
262 let tz = Tokyo;
263
264 let parsed_with_tz = try_parse(time_with_tz, fmt_with_tz, tz);
265 assert!(parsed_with_tz.is_ok());
266
267 let parsed_without_tz = try_parse(time_without_tz, fmt_without_tz, tz);
268 assert!(parsed_without_tz.is_ok());
269
270 assert_eq!(parsed_with_tz.unwrap(), parsed_without_tz.unwrap());
271 }
272
273 #[test]
274 fn test_parse() {
275 let processor = DateProcessor::default();
276
277 let values: Vec<&str> = vec![
278 "2014-5-17T12:34:56",
279 "2014-5-17T12:34:56Z",
280 "2014-5-17T12:34:56+09:30",
281 "2014-5-17T12:34:56.000+09:30",
282 "2014-5-17T12:34:56-0930",
283 "2014-5-17T12:34:56.000-0930",
284 ]
285 .into_iter()
286 .collect();
287
288 for value in values {
289 let parsed = processor.parse(value);
290 assert!(parsed.is_ok());
291 }
292 }
293
294 #[test]
295 fn test_parse_with_formats() {
296 let formats = vec![
297 "%Y-%m-%dT%H:%M:%S%:z",
298 "%Y-%m-%dT%H:%M:%S%.3f%:z",
299 "%Y-%m-%dT%H:%M:%S",
300 "%Y-%m-%dT%H:%M:%SZ",
301 ]
302 .into_iter()
303 .map(|s| Arc::new(s.to_string()))
304 .collect::<Vec<_>>();
305 let processor = DateProcessor {
306 formats: super::Formats(formats),
307 ..Default::default()
308 };
309
310 let values: Vec<&str> = vec![
311 "2014-5-17T12:34:56",
312 "2014-5-17T12:34:56Z",
313 "2014-5-17T12:34:56+09:30",
314 "2014-5-17T12:34:56.000+09:30",
315 "2014-5-17T12:34:56-0930",
316 "2014-5-17T12:34:56.000-0930",
317 ]
318 .into_iter()
319 .collect();
320
321 for value in values {
322 let parsed = processor.parse(value);
323 assert!(parsed.is_ok());
324 }
325 }
326
327 #[test]
328 fn test_parse_with_timezone() {
329 let processor = DateProcessor {
330 timezone: Some(Arc::new("Asia/Tokyo".to_string())),
331 ..Default::default()
332 };
333
334 let values: Vec<&str> = vec![
335 "2014-5-17T12:34:56",
336 "2014-5-17T12:34:56Z",
337 "2014-5-17T12:34:56+09:30",
338 "2014-5-17T12:34:56.000+09:30",
339 "2014-5-17T12:34:56-0930",
340 "2014-5-17T12:34:56.000-0930",
341 ]
342 .into_iter()
343 .collect();
344
345 for value in values {
346 let parsed = processor.parse(value);
347 assert!(parsed.is_ok());
348 }
349 }
350}