pipeline/etl/processor/
timestamp.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use chrono::{DateTime, NaiveDateTime};
18use chrono_tz::Tz;
19use lazy_static::lazy_static;
20use snafu::{OptionExt, ResultExt};
21
22use crate::error::{
23    DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateInvalidFormatSnafu,
24    DateParseSnafu, DateParseTimezoneSnafu, EpochInvalidResolutionSnafu, Error,
25    KeyMustBeStringSnafu, ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu,
26    ProcessorUnsupportedValueSnafu, Result,
27};
28use crate::etl::field::Fields;
29use crate::etl::processor::{
30    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, FIELDS_NAME,
31    FIELD_NAME, IGNORE_MISSING_NAME,
32};
33use crate::etl::value::time::{
34    MICROSECOND_RESOLUTION, MICRO_RESOLUTION, MILLISECOND_RESOLUTION, MILLI_RESOLUTION,
35    MS_RESOLUTION, NANOSECOND_RESOLUTION, NANO_RESOLUTION, NS_RESOLUTION, SECOND_RESOLUTION,
36    SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION,
37};
38use crate::etl::value::{Timestamp, Value};
39use crate::etl::PipelineMap;
40
41pub(crate) const PROCESSOR_TIMESTAMP: &str = "timestamp";
42const RESOLUTION_NAME: &str = "resolution";
43const FORMATS_NAME: &str = "formats"; // default RFC3339
44
45lazy_static! {
46    static ref DEFAULT_FORMATS: Vec<(Arc<String>,Tz)> = vec![
47                    // timezone with colon
48                    "%Y-%m-%dT%H:%M:%S%:z",
49                    "%Y-%m-%dT%H:%M:%S%.3f%:z",
50                    "%Y-%m-%dT%H:%M:%S%.6f%:z",
51                    "%Y-%m-%dT%H:%M:%S%.9f%:z",
52                    // timezone without colon
53                    "%Y-%m-%dT%H:%M:%S%z",
54                    "%Y-%m-%dT%H:%M:%S%.3f%z",
55                    "%Y-%m-%dT%H:%M:%S%.6f%z",
56                    "%Y-%m-%dT%H:%M:%S%.9f%z",
57                    // without timezone
58                    "%Y-%m-%dT%H:%M:%SZ",
59                    "%Y-%m-%dT%H:%M:%S",
60                    "%Y-%m-%dT%H:%M:%S%.3f",
61                    "%Y-%m-%dT%H:%M:%S%.6f",
62                    "%Y-%m-%dT%H:%M:%S%.9f",
63                ]
64                .iter()
65                .map(|s| (Arc::new(s.to_string()),Tz::UCT))
66                .collect();
67}
68
69#[derive(Debug, Default)]
70enum Resolution {
71    Second,
72    #[default]
73    Milli,
74    Micro,
75    Nano,
76}
77
78impl TryFrom<&str> for Resolution {
79    type Error = Error;
80
81    fn try_from(s: &str) -> Result<Self> {
82        match s {
83            SECOND_RESOLUTION | SEC_RESOLUTION | S_RESOLUTION => Ok(Resolution::Second),
84            MILLISECOND_RESOLUTION | MILLI_RESOLUTION | MS_RESOLUTION => Ok(Resolution::Milli),
85            MICROSECOND_RESOLUTION | MICRO_RESOLUTION | US_RESOLUTION => Ok(Resolution::Micro),
86            NANOSECOND_RESOLUTION | NANO_RESOLUTION | NS_RESOLUTION => Ok(Resolution::Nano),
87            _ => EpochInvalidResolutionSnafu { resolution: s }.fail(),
88        }
89    }
90}
91
92#[derive(Debug)]
93struct Formats(Vec<(Arc<String>, Tz)>);
94
95impl Formats {
96    fn new(mut formats: Vec<(Arc<String>, Tz)>) -> Self {
97        formats.sort_by_key(|(key, _)| key.clone());
98        formats.dedup();
99        Formats(formats)
100    }
101}
102
103impl Default for Formats {
104    fn default() -> Self {
105        Formats(DEFAULT_FORMATS.clone())
106    }
107}
108
109impl std::ops::Deref for Formats {
110    type Target = Vec<(Arc<String>, Tz)>;
111
112    fn deref(&self) -> &Self::Target {
113        &self.0
114    }
115}
116
117/// support string, integer, float, time, epoch
118#[derive(Debug, Default)]
119pub struct TimestampProcessor {
120    fields: Fields,
121    formats: Formats,
122    resolution: Resolution,
123    ignore_missing: bool,
124    // description
125    // if
126    // ignore_failure
127    // on_failure
128    // tag
129}
130
131impl TimestampProcessor {
132    /// try to parse val with timezone first, if failed, parse without timezone
133    fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result<i64> {
134        if let Ok(dt) = DateTime::parse_from_str(val, fmt) {
135            Ok(dt
136                .timestamp_nanos_opt()
137                .context(DateFailedToGetTimestampSnafu)?)
138        } else {
139            let dt = NaiveDateTime::parse_from_str(val, fmt)
140                .context(DateParseSnafu { value: val })?
141                .and_local_timezone(tz)
142                .single()
143                .context(DateFailedToGetLocalTimezoneSnafu)?;
144            Ok(dt
145                .timestamp_nanos_opt()
146                .context(DateFailedToGetTimestampSnafu)?)
147        }
148    }
149
150    fn parse_time_str(&self, val: &str) -> Result<i64> {
151        for (fmt, tz) in self.formats.iter() {
152            if let Ok(ns) = Self::try_parse(val, fmt, *tz) {
153                return Ok(ns);
154            }
155        }
156        ProcessorFailedToParseStringSnafu {
157            kind: PROCESSOR_TIMESTAMP,
158            value: val.to_string(),
159        }
160        .fail()
161    }
162
163    fn parse(&self, val: &Value) -> Result<Timestamp> {
164        let t: i64 = match val {
165            Value::String(s) => {
166                let t = s.parse::<i64>();
167                match t {
168                    Ok(t) => t,
169                    Err(_) => {
170                        let ns = self.parse_time_str(s)?;
171                        return Ok(Timestamp::Nanosecond(ns));
172                    }
173                }
174            }
175            Value::Int16(i) => *i as i64,
176            Value::Int32(i) => *i as i64,
177            Value::Int64(i) => *i,
178            Value::Uint8(i) => *i as i64,
179            Value::Uint16(i) => *i as i64,
180            Value::Uint32(i) => *i as i64,
181            Value::Uint64(i) => *i as i64,
182            Value::Float32(f) => *f as i64,
183            Value::Float64(f) => *f as i64,
184
185            Value::Timestamp(e) => match self.resolution {
186                Resolution::Second => e.timestamp(),
187                Resolution::Milli => e.timestamp_millis(),
188                Resolution::Micro => e.timestamp_micros(),
189                Resolution::Nano => e.timestamp_nanos(),
190            },
191
192            _ => {
193                return ProcessorUnsupportedValueSnafu {
194                    processor: PROCESSOR_TIMESTAMP,
195                    val: val.to_string(),
196                }
197                .fail();
198            }
199        };
200
201        match self.resolution {
202            Resolution::Second => Ok(Timestamp::Second(t)),
203            Resolution::Milli => Ok(Timestamp::Millisecond(t)),
204            Resolution::Micro => Ok(Timestamp::Microsecond(t)),
205            Resolution::Nano => Ok(Timestamp::Nanosecond(t)),
206        }
207    }
208}
209
210fn parse_formats(yaml: &yaml_rust::yaml::Yaml) -> Result<Vec<(Arc<String>, Tz)>> {
211    match yaml.as_vec() {
212        Some(formats_yaml) => {
213            let mut formats = Vec::with_capacity(formats_yaml.len());
214            for v in formats_yaml {
215                let s = yaml_strings(v, FORMATS_NAME)
216                    .or(yaml_string(v, FORMATS_NAME).map(|s| vec![s]))?;
217                if s.len() != 1 && s.len() != 2 {
218                    return DateInvalidFormatSnafu {
219                        processor: PROCESSOR_TIMESTAMP,
220                        s: format!("{s:?}"),
221                    }
222                    .fail();
223                }
224                let mut iter = s.into_iter();
225                // safety: unwrap is safe here
226                let formatter = iter.next().unwrap();
227                let tz = iter
228                    .next()
229                    .map(|tz| {
230                        tz.parse::<Tz>()
231                            .context(DateParseTimezoneSnafu { value: tz })
232                    })
233                    .unwrap_or(Ok(Tz::UTC))?;
234                formats.push((Arc::new(formatter), tz));
235            }
236            Ok(formats)
237        }
238        None => DateInvalidFormatSnafu {
239            processor: PROCESSOR_TIMESTAMP,
240            s: format!("{yaml:?}"),
241        }
242        .fail(),
243    }
244}
245
246impl TryFrom<&yaml_rust::yaml::Hash> for TimestampProcessor {
247    type Error = Error;
248
249    fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
250        let mut fields = Fields::default();
251        let mut formats = Formats::default();
252        let mut resolution = Resolution::default();
253        let mut ignore_missing = false;
254
255        for (k, v) in hash {
256            let key = k
257                .as_str()
258                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
259
260            match key {
261                FIELD_NAME => {
262                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
263                }
264                FIELDS_NAME => {
265                    fields = yaml_new_fields(v, FIELDS_NAME)?;
266                }
267                FORMATS_NAME => {
268                    let formats_vec = parse_formats(v)?;
269                    formats = Formats::new(formats_vec);
270                }
271                RESOLUTION_NAME => {
272                    resolution = yaml_string(v, RESOLUTION_NAME)?.as_str().try_into()?;
273                }
274                IGNORE_MISSING_NAME => {
275                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
276                }
277                _ => {}
278            }
279        }
280
281        let processor_builder = TimestampProcessor {
282            fields,
283            formats,
284            resolution,
285            ignore_missing,
286        };
287
288        Ok(processor_builder)
289    }
290}
291
292impl Processor for TimestampProcessor {
293    fn kind(&self) -> &str {
294        PROCESSOR_TIMESTAMP
295    }
296
297    fn ignore_missing(&self) -> bool {
298        self.ignore_missing
299    }
300
301    fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
302        for field in self.fields.iter() {
303            let index = field.input_field();
304            match val.get(index) {
305                Some(Value::Null) | None => {
306                    if !self.ignore_missing {
307                        return ProcessorMissingFieldSnafu {
308                            processor: self.kind(),
309                            field: field.input_field(),
310                        }
311                        .fail();
312                    }
313                }
314                Some(v) => {
315                    let result = self.parse(v)?;
316                    let output_key = field.target_or_input_field();
317                    val.insert(output_key.to_string(), Value::Timestamp(result));
318                }
319            }
320        }
321        Ok(())
322    }
323}
324
325#[cfg(test)]
326mod tests {
327    use yaml_rust::YamlLoader;
328
329    use super::TimestampProcessor;
330    use crate::etl::value::{Timestamp, Value};
331
332    #[test]
333    fn test_parse_epoch() {
334        let processor_yaml_str = r#"fields:
335  - hello
336resolution: s
337formats:
338  - "%Y-%m-%dT%H:%M:%S%:z"
339  - "%Y-%m-%dT%H:%M:%S%.3f%:z"
340  - "%Y-%m-%dT%H:%M:%S"
341  - "%Y-%m-%dT%H:%M:%SZ"
342"#;
343        let yaml = &YamlLoader::load_from_str(processor_yaml_str).unwrap()[0];
344        let timestamp_yaml = yaml.as_hash().unwrap();
345        let processor = TimestampProcessor::try_from(timestamp_yaml).unwrap();
346
347        let values = [
348            (
349                Value::String("1573840000".into()),
350                Timestamp::Second(1573840000),
351            ),
352            (Value::Int32(1573840001), Timestamp::Second(1573840001)),
353            (Value::Uint64(1573840002), Timestamp::Second(1573840002)),
354            // float32 has a problem expressing the timestamp.
355            // 1573840003.0_f32 as i64 is 1573840000
356            //(Value::Float32(1573840003.0), Epoch::Second(1573840003)),
357            (
358                Value::String("2019-11-15T17:46:40Z".into()),
359                Timestamp::Nanosecond(1573840000000000000),
360            ),
361        ];
362
363        for (value, result) in values {
364            let parsed = processor.parse(&value).unwrap();
365            assert_eq!(parsed, result);
366        }
367        let values: Vec<&str> = vec![
368            "2014-5-17T12:34:56",
369            "2014-5-17T12:34:56Z",
370            "2014-5-17T12:34:56+09:30",
371            "2014-5-17T12:34:56.000+09:30",
372            "2014-5-17T12:34:56-0930",
373            "2014-5-17T12:34:56.000-0930",
374        ]
375        .into_iter()
376        .collect();
377
378        for value in values {
379            let parsed = processor.parse(&Value::String(value.into()));
380            assert!(parsed.is_ok());
381        }
382    }
383
384    #[test]
385    fn test_parse_with_timezone() {
386        let processor_yaml_str = r#"fields:
387  - hello
388resolution: s
389formats:
390  - ["%Y-%m-%dT%H:%M:%S%:z", "Asia/Tokyo"]
391  - ["%Y-%m-%dT%H:%M:%S%.3f%:z", "Asia/Tokyo"]
392  - ["%Y-%m-%dT%H:%M:%S", "Asia/Tokyo"]
393  - ["%Y-%m-%dT%H:%M:%SZ", "Asia/Tokyo"]
394"#;
395        let yaml = &YamlLoader::load_from_str(processor_yaml_str).unwrap()[0];
396        let timestamp_yaml = yaml.as_hash().unwrap();
397        let processor = TimestampProcessor::try_from(timestamp_yaml).unwrap();
398
399        let values: Vec<&str> = vec![
400            "2014-5-17T12:34:56",
401            "2014-5-17T12:34:56Z",
402            "2014-5-17T12:34:56+09:30",
403            "2014-5-17T12:34:56.000+09:30",
404            "2014-5-17T12:34:56-0930",
405            "2014-5-17T12:34:56.000-0930",
406        ]
407        .into_iter()
408        .collect();
409
410        for value in values {
411            let parsed = processor.parse(&Value::String(value.into()));
412            assert!(parsed.is_ok());
413        }
414    }
415}