1use std::sync::Arc;
16
17use chrono::{DateTime, NaiveDateTime};
18use chrono_tz::Tz;
19use lazy_static::lazy_static;
20use snafu::{OptionExt, ResultExt};
21
22use crate::error::{
23 DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateInvalidFormatSnafu,
24 DateParseSnafu, DateParseTimezoneSnafu, EpochInvalidResolutionSnafu, Error,
25 KeyMustBeStringSnafu, ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu,
26 ProcessorUnsupportedValueSnafu, Result,
27};
28use crate::etl::field::Fields;
29use crate::etl::processor::{
30 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, FIELDS_NAME,
31 FIELD_NAME, IGNORE_MISSING_NAME,
32};
33use crate::etl::value::time::{
34 MICROSECOND_RESOLUTION, MICRO_RESOLUTION, MILLISECOND_RESOLUTION, MILLI_RESOLUTION,
35 MS_RESOLUTION, NANOSECOND_RESOLUTION, NANO_RESOLUTION, NS_RESOLUTION, SECOND_RESOLUTION,
36 SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION,
37};
38use crate::etl::value::{Timestamp, Value};
39use crate::etl::PipelineMap;
40
41pub(crate) const PROCESSOR_TIMESTAMP: &str = "timestamp";
42const RESOLUTION_NAME: &str = "resolution";
43const FORMATS_NAME: &str = "formats"; lazy_static! {
46 static ref DEFAULT_FORMATS: Vec<(Arc<String>,Tz)> = vec![
47 "%Y-%m-%dT%H:%M:%S%:z",
49 "%Y-%m-%dT%H:%M:%S%.3f%:z",
50 "%Y-%m-%dT%H:%M:%S%.6f%:z",
51 "%Y-%m-%dT%H:%M:%S%.9f%:z",
52 "%Y-%m-%dT%H:%M:%S%z",
54 "%Y-%m-%dT%H:%M:%S%.3f%z",
55 "%Y-%m-%dT%H:%M:%S%.6f%z",
56 "%Y-%m-%dT%H:%M:%S%.9f%z",
57 "%Y-%m-%dT%H:%M:%SZ",
59 "%Y-%m-%dT%H:%M:%S",
60 "%Y-%m-%dT%H:%M:%S%.3f",
61 "%Y-%m-%dT%H:%M:%S%.6f",
62 "%Y-%m-%dT%H:%M:%S%.9f",
63 ]
64 .iter()
65 .map(|s| (Arc::new(s.to_string()),Tz::UCT))
66 .collect();
67}
68
69#[derive(Debug, Default)]
70enum Resolution {
71 Second,
72 #[default]
73 Milli,
74 Micro,
75 Nano,
76}
77
78impl TryFrom<&str> for Resolution {
79 type Error = Error;
80
81 fn try_from(s: &str) -> Result<Self> {
82 match s {
83 SECOND_RESOLUTION | SEC_RESOLUTION | S_RESOLUTION => Ok(Resolution::Second),
84 MILLISECOND_RESOLUTION | MILLI_RESOLUTION | MS_RESOLUTION => Ok(Resolution::Milli),
85 MICROSECOND_RESOLUTION | MICRO_RESOLUTION | US_RESOLUTION => Ok(Resolution::Micro),
86 NANOSECOND_RESOLUTION | NANO_RESOLUTION | NS_RESOLUTION => Ok(Resolution::Nano),
87 _ => EpochInvalidResolutionSnafu { resolution: s }.fail(),
88 }
89 }
90}
91
92#[derive(Debug)]
93struct Formats(Vec<(Arc<String>, Tz)>);
94
95impl Formats {
96 fn new(mut formats: Vec<(Arc<String>, Tz)>) -> Self {
97 formats.sort_by_key(|(key, _)| key.clone());
98 formats.dedup();
99 Formats(formats)
100 }
101}
102
103impl Default for Formats {
104 fn default() -> Self {
105 Formats(DEFAULT_FORMATS.clone())
106 }
107}
108
109impl std::ops::Deref for Formats {
110 type Target = Vec<(Arc<String>, Tz)>;
111
112 fn deref(&self) -> &Self::Target {
113 &self.0
114 }
115}
116
117#[derive(Debug, Default)]
119pub struct TimestampProcessor {
120 fields: Fields,
121 formats: Formats,
122 resolution: Resolution,
123 ignore_missing: bool,
124 }
130
131impl TimestampProcessor {
132 fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result<i64> {
134 if let Ok(dt) = DateTime::parse_from_str(val, fmt) {
135 Ok(dt
136 .timestamp_nanos_opt()
137 .context(DateFailedToGetTimestampSnafu)?)
138 } else {
139 let dt = NaiveDateTime::parse_from_str(val, fmt)
140 .context(DateParseSnafu { value: val })?
141 .and_local_timezone(tz)
142 .single()
143 .context(DateFailedToGetLocalTimezoneSnafu)?;
144 Ok(dt
145 .timestamp_nanos_opt()
146 .context(DateFailedToGetTimestampSnafu)?)
147 }
148 }
149
150 fn parse_time_str(&self, val: &str) -> Result<i64> {
151 for (fmt, tz) in self.formats.iter() {
152 if let Ok(ns) = Self::try_parse(val, fmt, *tz) {
153 return Ok(ns);
154 }
155 }
156 ProcessorFailedToParseStringSnafu {
157 kind: PROCESSOR_TIMESTAMP,
158 value: val.to_string(),
159 }
160 .fail()
161 }
162
163 fn parse(&self, val: &Value) -> Result<Timestamp> {
164 let t: i64 = match val {
165 Value::String(s) => {
166 let t = s.parse::<i64>();
167 match t {
168 Ok(t) => t,
169 Err(_) => {
170 let ns = self.parse_time_str(s)?;
171 return Ok(Timestamp::Nanosecond(ns));
172 }
173 }
174 }
175 Value::Int16(i) => *i as i64,
176 Value::Int32(i) => *i as i64,
177 Value::Int64(i) => *i,
178 Value::Uint8(i) => *i as i64,
179 Value::Uint16(i) => *i as i64,
180 Value::Uint32(i) => *i as i64,
181 Value::Uint64(i) => *i as i64,
182 Value::Float32(f) => *f as i64,
183 Value::Float64(f) => *f as i64,
184
185 Value::Timestamp(e) => match self.resolution {
186 Resolution::Second => e.timestamp(),
187 Resolution::Milli => e.timestamp_millis(),
188 Resolution::Micro => e.timestamp_micros(),
189 Resolution::Nano => e.timestamp_nanos(),
190 },
191
192 _ => {
193 return ProcessorUnsupportedValueSnafu {
194 processor: PROCESSOR_TIMESTAMP,
195 val: val.to_string(),
196 }
197 .fail();
198 }
199 };
200
201 match self.resolution {
202 Resolution::Second => Ok(Timestamp::Second(t)),
203 Resolution::Milli => Ok(Timestamp::Millisecond(t)),
204 Resolution::Micro => Ok(Timestamp::Microsecond(t)),
205 Resolution::Nano => Ok(Timestamp::Nanosecond(t)),
206 }
207 }
208}
209
210fn parse_formats(yaml: &yaml_rust::yaml::Yaml) -> Result<Vec<(Arc<String>, Tz)>> {
211 match yaml.as_vec() {
212 Some(formats_yaml) => {
213 let mut formats = Vec::with_capacity(formats_yaml.len());
214 for v in formats_yaml {
215 let s = yaml_strings(v, FORMATS_NAME)
216 .or(yaml_string(v, FORMATS_NAME).map(|s| vec![s]))?;
217 if s.len() != 1 && s.len() != 2 {
218 return DateInvalidFormatSnafu {
219 processor: PROCESSOR_TIMESTAMP,
220 s: format!("{s:?}"),
221 }
222 .fail();
223 }
224 let mut iter = s.into_iter();
225 let formatter = iter.next().unwrap();
227 let tz = iter
228 .next()
229 .map(|tz| {
230 tz.parse::<Tz>()
231 .context(DateParseTimezoneSnafu { value: tz })
232 })
233 .unwrap_or(Ok(Tz::UTC))?;
234 formats.push((Arc::new(formatter), tz));
235 }
236 Ok(formats)
237 }
238 None => DateInvalidFormatSnafu {
239 processor: PROCESSOR_TIMESTAMP,
240 s: format!("{yaml:?}"),
241 }
242 .fail(),
243 }
244}
245
246impl TryFrom<&yaml_rust::yaml::Hash> for TimestampProcessor {
247 type Error = Error;
248
249 fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
250 let mut fields = Fields::default();
251 let mut formats = Formats::default();
252 let mut resolution = Resolution::default();
253 let mut ignore_missing = false;
254
255 for (k, v) in hash {
256 let key = k
257 .as_str()
258 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
259
260 match key {
261 FIELD_NAME => {
262 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
263 }
264 FIELDS_NAME => {
265 fields = yaml_new_fields(v, FIELDS_NAME)?;
266 }
267 FORMATS_NAME => {
268 let formats_vec = parse_formats(v)?;
269 formats = Formats::new(formats_vec);
270 }
271 RESOLUTION_NAME => {
272 resolution = yaml_string(v, RESOLUTION_NAME)?.as_str().try_into()?;
273 }
274 IGNORE_MISSING_NAME => {
275 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
276 }
277 _ => {}
278 }
279 }
280
281 let processor_builder = TimestampProcessor {
282 fields,
283 formats,
284 resolution,
285 ignore_missing,
286 };
287
288 Ok(processor_builder)
289 }
290}
291
292impl Processor for TimestampProcessor {
293 fn kind(&self) -> &str {
294 PROCESSOR_TIMESTAMP
295 }
296
297 fn ignore_missing(&self) -> bool {
298 self.ignore_missing
299 }
300
301 fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
302 for field in self.fields.iter() {
303 let index = field.input_field();
304 match val.get(index) {
305 Some(Value::Null) | None => {
306 if !self.ignore_missing {
307 return ProcessorMissingFieldSnafu {
308 processor: self.kind(),
309 field: field.input_field(),
310 }
311 .fail();
312 }
313 }
314 Some(v) => {
315 let result = self.parse(v)?;
316 let output_key = field.target_or_input_field();
317 val.insert(output_key.to_string(), Value::Timestamp(result));
318 }
319 }
320 }
321 Ok(())
322 }
323}
324
325#[cfg(test)]
326mod tests {
327 use yaml_rust::YamlLoader;
328
329 use super::TimestampProcessor;
330 use crate::etl::value::{Timestamp, Value};
331
332 #[test]
333 fn test_parse_epoch() {
334 let processor_yaml_str = r#"fields:
335 - hello
336resolution: s
337formats:
338 - "%Y-%m-%dT%H:%M:%S%:z"
339 - "%Y-%m-%dT%H:%M:%S%.3f%:z"
340 - "%Y-%m-%dT%H:%M:%S"
341 - "%Y-%m-%dT%H:%M:%SZ"
342"#;
343 let yaml = &YamlLoader::load_from_str(processor_yaml_str).unwrap()[0];
344 let timestamp_yaml = yaml.as_hash().unwrap();
345 let processor = TimestampProcessor::try_from(timestamp_yaml).unwrap();
346
347 let values = [
348 (
349 Value::String("1573840000".into()),
350 Timestamp::Second(1573840000),
351 ),
352 (Value::Int32(1573840001), Timestamp::Second(1573840001)),
353 (Value::Uint64(1573840002), Timestamp::Second(1573840002)),
354 (
358 Value::String("2019-11-15T17:46:40Z".into()),
359 Timestamp::Nanosecond(1573840000000000000),
360 ),
361 ];
362
363 for (value, result) in values {
364 let parsed = processor.parse(&value).unwrap();
365 assert_eq!(parsed, result);
366 }
367 let values: Vec<&str> = vec![
368 "2014-5-17T12:34:56",
369 "2014-5-17T12:34:56Z",
370 "2014-5-17T12:34:56+09:30",
371 "2014-5-17T12:34:56.000+09:30",
372 "2014-5-17T12:34:56-0930",
373 "2014-5-17T12:34:56.000-0930",
374 ]
375 .into_iter()
376 .collect();
377
378 for value in values {
379 let parsed = processor.parse(&Value::String(value.into()));
380 assert!(parsed.is_ok());
381 }
382 }
383
384 #[test]
385 fn test_parse_with_timezone() {
386 let processor_yaml_str = r#"fields:
387 - hello
388resolution: s
389formats:
390 - ["%Y-%m-%dT%H:%M:%S%:z", "Asia/Tokyo"]
391 - ["%Y-%m-%dT%H:%M:%S%.3f%:z", "Asia/Tokyo"]
392 - ["%Y-%m-%dT%H:%M:%S", "Asia/Tokyo"]
393 - ["%Y-%m-%dT%H:%M:%SZ", "Asia/Tokyo"]
394"#;
395 let yaml = &YamlLoader::load_from_str(processor_yaml_str).unwrap()[0];
396 let timestamp_yaml = yaml.as_hash().unwrap();
397 let processor = TimestampProcessor::try_from(timestamp_yaml).unwrap();
398
399 let values: Vec<&str> = vec![
400 "2014-5-17T12:34:56",
401 "2014-5-17T12:34:56Z",
402 "2014-5-17T12:34:56+09:30",
403 "2014-5-17T12:34:56.000+09:30",
404 "2014-5-17T12:34:56-0930",
405 "2014-5-17T12:34:56.000-0930",
406 ]
407 .into_iter()
408 .collect();
409
410 for value in values {
411 let parsed = processor.parse(&Value::String(value.into()));
412 assert!(parsed.is_ok());
413 }
414 }
415}