1use std::sync::Arc;
16
17use chrono::{DateTime, NaiveDateTime};
18use chrono_tz::Tz;
19use lazy_static::lazy_static;
20use snafu::{OptionExt, ResultExt};
21
22use crate::error::{
23 DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateInvalidFormatSnafu,
24 DateParseSnafu, DateParseTimezoneSnafu, EpochInvalidResolutionSnafu, Error,
25 KeyMustBeStringSnafu, ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu,
26 ProcessorUnsupportedValueSnafu, Result,
27};
28use crate::etl::field::Fields;
29use crate::etl::processor::{
30 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, FIELDS_NAME,
31 FIELD_NAME, IGNORE_MISSING_NAME,
32};
33use crate::etl::value::time::{
34 MICROSECOND_RESOLUTION, MICRO_RESOLUTION, MILLISECOND_RESOLUTION, MILLI_RESOLUTION,
35 MS_RESOLUTION, NANOSECOND_RESOLUTION, NANO_RESOLUTION, NS_RESOLUTION, SECOND_RESOLUTION,
36 SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION,
37};
38use crate::etl::value::{Timestamp, Value};
39use crate::etl::PipelineMap;
40
41pub(crate) const PROCESSOR_TIMESTAMP: &str = "timestamp";
42const RESOLUTION_NAME: &str = "resolution";
43const FORMATS_NAME: &str = "formats"; lazy_static! {
46 static ref DEFAULT_FORMATS: Vec<(Arc<String>,Tz)> = vec![
47 "%Y-%m-%dT%H:%M:%S%:z",
49 "%Y-%m-%dT%H:%M:%S%.3f%:z",
50 "%Y-%m-%dT%H:%M:%S%.6f%:z",
51 "%Y-%m-%dT%H:%M:%S%.9f%:z",
52 "%Y-%m-%dT%H:%M:%S%z",
54 "%Y-%m-%dT%H:%M:%S%.3f%z",
55 "%Y-%m-%dT%H:%M:%S%.6f%z",
56 "%Y-%m-%dT%H:%M:%S%.9f%z",
57 "%Y-%m-%dT%H:%M:%SZ",
59 "%Y-%m-%dT%H:%M:%S",
60 "%Y-%m-%dT%H:%M:%S%.3f",
61 "%Y-%m-%dT%H:%M:%S%.6f",
62 "%Y-%m-%dT%H:%M:%S%.9f",
63 ]
64 .iter()
65 .map(|s| (Arc::new(s.to_string()),Tz::UCT))
66 .collect();
67}
68
69#[derive(Debug, Default)]
70enum Resolution {
71 Second,
72 #[default]
73 Milli,
74 Micro,
75 Nano,
76}
77
78impl TryFrom<&str> for Resolution {
79 type Error = Error;
80
81 fn try_from(s: &str) -> Result<Self> {
82 match s {
83 SECOND_RESOLUTION | SEC_RESOLUTION | S_RESOLUTION => Ok(Resolution::Second),
84 MILLISECOND_RESOLUTION | MILLI_RESOLUTION | MS_RESOLUTION => Ok(Resolution::Milli),
85 MICROSECOND_RESOLUTION | MICRO_RESOLUTION | US_RESOLUTION => Ok(Resolution::Micro),
86 NANOSECOND_RESOLUTION | NANO_RESOLUTION | NS_RESOLUTION => Ok(Resolution::Nano),
87 _ => EpochInvalidResolutionSnafu { resolution: s }.fail(),
88 }
89 }
90}
91
92#[derive(Debug)]
93struct Formats(Vec<(Arc<String>, Tz)>);
94
95impl Formats {
96 fn new(mut formats: Vec<(Arc<String>, Tz)>) -> Self {
97 formats.sort_by_key(|(key, _)| key.clone());
98 formats.dedup();
99 Formats(formats)
100 }
101}
102
103impl Default for Formats {
104 fn default() -> Self {
105 Formats(DEFAULT_FORMATS.clone())
106 }
107}
108
109impl std::ops::Deref for Formats {
110 type Target = Vec<(Arc<String>, Tz)>;
111
112 fn deref(&self) -> &Self::Target {
113 &self.0
114 }
115}
116
117#[derive(Debug, Default)]
119pub struct TimestampProcessor {
120 fields: Fields,
121 formats: Formats,
122 resolution: Resolution,
123 ignore_missing: bool,
124 }
130
131impl TimestampProcessor {
132 fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result<i64> {
134 if let Ok(dt) = DateTime::parse_from_str(val, fmt) {
135 Ok(dt
136 .timestamp_nanos_opt()
137 .context(DateFailedToGetTimestampSnafu)?)
138 } else {
139 let dt = NaiveDateTime::parse_from_str(val, fmt)
140 .context(DateParseSnafu { value: val })?
141 .and_local_timezone(tz)
142 .single()
143 .context(DateFailedToGetLocalTimezoneSnafu)?;
144 Ok(dt
145 .timestamp_nanos_opt()
146 .context(DateFailedToGetTimestampSnafu)?)
147 }
148 }
149
150 fn parse_time_str(&self, val: &str) -> Result<i64> {
151 for (fmt, tz) in self.formats.iter() {
152 if let Ok(ns) = Self::try_parse(val, fmt, *tz) {
153 return Ok(ns);
154 }
155 }
156 ProcessorFailedToParseStringSnafu {
157 kind: PROCESSOR_TIMESTAMP,
158 value: val.to_string(),
159 }
160 .fail()
161 }
162
163 fn parse(&self, val: &Value) -> Result<Timestamp> {
164 let t: i64 = match val {
165 Value::String(s) => {
166 let t = s.parse::<i64>();
167 match t {
168 Ok(t) => t,
169 Err(_) => {
170 let ns = self.parse_time_str(s)?;
171 return Ok(Timestamp::Nanosecond(ns));
172 }
173 }
174 }
175 Value::Int16(i) => *i as i64,
176 Value::Int32(i) => *i as i64,
177 Value::Int64(i) => *i,
178 Value::Uint8(i) => *i as i64,
179 Value::Uint16(i) => *i as i64,
180 Value::Uint32(i) => *i as i64,
181 Value::Uint64(i) => *i as i64,
182 Value::Float32(f) => *f as i64,
183 Value::Float64(f) => *f as i64,
184
185 Value::Timestamp(e) => match self.resolution {
186 Resolution::Second => e.timestamp(),
187 Resolution::Milli => e.timestamp_millis(),
188 Resolution::Micro => e.timestamp_micros(),
189 Resolution::Nano => e.timestamp_nanos(),
190 },
191
192 _ => {
193 return ProcessorUnsupportedValueSnafu {
194 processor: PROCESSOR_TIMESTAMP,
195 val: val.to_string(),
196 }
197 .fail();
198 }
199 };
200
201 match self.resolution {
202 Resolution::Second => Ok(Timestamp::Second(t)),
203 Resolution::Milli => Ok(Timestamp::Millisecond(t)),
204 Resolution::Micro => Ok(Timestamp::Microsecond(t)),
205 Resolution::Nano => Ok(Timestamp::Nanosecond(t)),
206 }
207 }
208
209 pub(crate) fn target_count(&self) -> usize {
210 self.fields.len()
211 }
212}
213
214fn parse_formats(yaml: &yaml_rust::yaml::Yaml) -> Result<Vec<(Arc<String>, Tz)>> {
215 match yaml.as_vec() {
216 Some(formats_yaml) => {
217 let mut formats = Vec::with_capacity(formats_yaml.len());
218 for v in formats_yaml {
219 let s = yaml_strings(v, FORMATS_NAME)
220 .or(yaml_string(v, FORMATS_NAME).map(|s| vec![s]))?;
221 if s.len() != 1 && s.len() != 2 {
222 return DateInvalidFormatSnafu {
223 processor: PROCESSOR_TIMESTAMP,
224 s: format!("{s:?}"),
225 }
226 .fail();
227 }
228 let mut iter = s.into_iter();
229 let formatter = iter.next().unwrap();
231 let tz = iter
232 .next()
233 .map(|tz| {
234 tz.parse::<Tz>()
235 .context(DateParseTimezoneSnafu { value: tz })
236 })
237 .unwrap_or(Ok(Tz::UTC))?;
238 formats.push((Arc::new(formatter), tz));
239 }
240 Ok(formats)
241 }
242 None => DateInvalidFormatSnafu {
243 processor: PROCESSOR_TIMESTAMP,
244 s: format!("{yaml:?}"),
245 }
246 .fail(),
247 }
248}
249
250impl TryFrom<&yaml_rust::yaml::Hash> for TimestampProcessor {
251 type Error = Error;
252
253 fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
254 let mut fields = Fields::default();
255 let mut formats = Formats::default();
256 let mut resolution = Resolution::default();
257 let mut ignore_missing = false;
258
259 for (k, v) in hash {
260 let key = k
261 .as_str()
262 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
263
264 match key {
265 FIELD_NAME => {
266 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
267 }
268 FIELDS_NAME => {
269 fields = yaml_new_fields(v, FIELDS_NAME)?;
270 }
271 FORMATS_NAME => {
272 let formats_vec = parse_formats(v)?;
273 formats = Formats::new(formats_vec);
274 }
275 RESOLUTION_NAME => {
276 resolution = yaml_string(v, RESOLUTION_NAME)?.as_str().try_into()?;
277 }
278 IGNORE_MISSING_NAME => {
279 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
280 }
281 _ => {}
282 }
283 }
284
285 let processor_builder = TimestampProcessor {
286 fields,
287 formats,
288 resolution,
289 ignore_missing,
290 };
291
292 Ok(processor_builder)
293 }
294}
295
296impl Processor for TimestampProcessor {
297 fn kind(&self) -> &str {
298 PROCESSOR_TIMESTAMP
299 }
300
301 fn ignore_missing(&self) -> bool {
302 self.ignore_missing
303 }
304
305 fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
306 for field in self.fields.iter() {
307 let index = field.input_field();
308 match val.get(index) {
309 Some(Value::Null) | None => {
310 if !self.ignore_missing {
311 return ProcessorMissingFieldSnafu {
312 processor: self.kind(),
313 field: field.input_field(),
314 }
315 .fail();
316 }
317 }
318 Some(v) => {
319 let result = self.parse(v)?;
320 let output_key = field.target_or_input_field();
321 val.insert(output_key.to_string(), Value::Timestamp(result));
322 }
323 }
324 }
325 Ok(val)
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use yaml_rust::YamlLoader;
332
333 use super::TimestampProcessor;
334 use crate::etl::value::{Timestamp, Value};
335
336 #[test]
337 fn test_parse_epoch() {
338 let processor_yaml_str = r#"fields:
339 - hello
340resolution: s
341formats:
342 - "%Y-%m-%dT%H:%M:%S%:z"
343 - "%Y-%m-%dT%H:%M:%S%.3f%:z"
344 - "%Y-%m-%dT%H:%M:%S"
345 - "%Y-%m-%dT%H:%M:%SZ"
346"#;
347 let yaml = &YamlLoader::load_from_str(processor_yaml_str).unwrap()[0];
348 let timestamp_yaml = yaml.as_hash().unwrap();
349 let processor = TimestampProcessor::try_from(timestamp_yaml).unwrap();
350
351 let values = [
352 (
353 Value::String("1573840000".into()),
354 Timestamp::Second(1573840000),
355 ),
356 (Value::Int32(1573840001), Timestamp::Second(1573840001)),
357 (Value::Uint64(1573840002), Timestamp::Second(1573840002)),
358 (
362 Value::String("2019-11-15T17:46:40Z".into()),
363 Timestamp::Nanosecond(1573840000000000000),
364 ),
365 ];
366
367 for (value, result) in values {
368 let parsed = processor.parse(&value).unwrap();
369 assert_eq!(parsed, result);
370 }
371 let values: Vec<&str> = vec![
372 "2014-5-17T12:34:56",
373 "2014-5-17T12:34:56Z",
374 "2014-5-17T12:34:56+09:30",
375 "2014-5-17T12:34:56.000+09:30",
376 "2014-5-17T12:34:56-0930",
377 "2014-5-17T12:34:56.000-0930",
378 ]
379 .into_iter()
380 .collect();
381
382 for value in values {
383 let parsed = processor.parse(&Value::String(value.into()));
384 assert!(parsed.is_ok());
385 }
386 }
387
388 #[test]
389 fn test_parse_with_timezone() {
390 let processor_yaml_str = r#"fields:
391 - hello
392resolution: s
393formats:
394 - ["%Y-%m-%dT%H:%M:%S%:z", "Asia/Tokyo"]
395 - ["%Y-%m-%dT%H:%M:%S%.3f%:z", "Asia/Tokyo"]
396 - ["%Y-%m-%dT%H:%M:%S", "Asia/Tokyo"]
397 - ["%Y-%m-%dT%H:%M:%SZ", "Asia/Tokyo"]
398"#;
399 let yaml = &YamlLoader::load_from_str(processor_yaml_str).unwrap()[0];
400 let timestamp_yaml = yaml.as_hash().unwrap();
401 let processor = TimestampProcessor::try_from(timestamp_yaml).unwrap();
402
403 let values: Vec<&str> = vec![
404 "2014-5-17T12:34:56",
405 "2014-5-17T12:34:56Z",
406 "2014-5-17T12:34:56+09:30",
407 "2014-5-17T12:34:56.000+09:30",
408 "2014-5-17T12:34:56-0930",
409 "2014-5-17T12:34:56.000-0930",
410 ]
411 .into_iter()
412 .collect();
413
414 for value in values {
415 let parsed = processor.parse(&Value::String(value.into()));
416 assert!(parsed.is_ok());
417 }
418 }
419}