pipeline/etl/processor/
timestamp.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use chrono::{DateTime, NaiveDateTime};
18use chrono_tz::Tz;
19use lazy_static::lazy_static;
20use snafu::{OptionExt, ResultExt};
21
22use crate::error::{
23    DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateInvalidFormatSnafu,
24    DateParseSnafu, DateParseTimezoneSnafu, EpochInvalidResolutionSnafu, Error,
25    KeyMustBeStringSnafu, ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu,
26    ProcessorUnsupportedValueSnafu, Result,
27};
28use crate::etl::field::Fields;
29use crate::etl::processor::{
30    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, FIELDS_NAME,
31    FIELD_NAME, IGNORE_MISSING_NAME,
32};
33use crate::etl::value::time::{
34    MICROSECOND_RESOLUTION, MICRO_RESOLUTION, MILLISECOND_RESOLUTION, MILLI_RESOLUTION,
35    MS_RESOLUTION, NANOSECOND_RESOLUTION, NANO_RESOLUTION, NS_RESOLUTION, SECOND_RESOLUTION,
36    SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION,
37};
38use crate::etl::value::{Timestamp, Value};
39use crate::etl::PipelineMap;
40
41pub(crate) const PROCESSOR_TIMESTAMP: &str = "timestamp";
42const RESOLUTION_NAME: &str = "resolution";
43const FORMATS_NAME: &str = "formats"; // default RFC3339
44
45lazy_static! {
46    static ref DEFAULT_FORMATS: Vec<(Arc<String>,Tz)> = vec![
47                    // timezone with colon
48                    "%Y-%m-%dT%H:%M:%S%:z",
49                    "%Y-%m-%dT%H:%M:%S%.3f%:z",
50                    "%Y-%m-%dT%H:%M:%S%.6f%:z",
51                    "%Y-%m-%dT%H:%M:%S%.9f%:z",
52                    // timezone without colon
53                    "%Y-%m-%dT%H:%M:%S%z",
54                    "%Y-%m-%dT%H:%M:%S%.3f%z",
55                    "%Y-%m-%dT%H:%M:%S%.6f%z",
56                    "%Y-%m-%dT%H:%M:%S%.9f%z",
57                    // without timezone
58                    "%Y-%m-%dT%H:%M:%SZ",
59                    "%Y-%m-%dT%H:%M:%S",
60                    "%Y-%m-%dT%H:%M:%S%.3f",
61                    "%Y-%m-%dT%H:%M:%S%.6f",
62                    "%Y-%m-%dT%H:%M:%S%.9f",
63                ]
64                .iter()
65                .map(|s| (Arc::new(s.to_string()),Tz::UCT))
66                .collect();
67}
68
69#[derive(Debug, Default)]
70enum Resolution {
71    Second,
72    #[default]
73    Milli,
74    Micro,
75    Nano,
76}
77
78impl TryFrom<&str> for Resolution {
79    type Error = Error;
80
81    fn try_from(s: &str) -> Result<Self> {
82        match s {
83            SECOND_RESOLUTION | SEC_RESOLUTION | S_RESOLUTION => Ok(Resolution::Second),
84            MILLISECOND_RESOLUTION | MILLI_RESOLUTION | MS_RESOLUTION => Ok(Resolution::Milli),
85            MICROSECOND_RESOLUTION | MICRO_RESOLUTION | US_RESOLUTION => Ok(Resolution::Micro),
86            NANOSECOND_RESOLUTION | NANO_RESOLUTION | NS_RESOLUTION => Ok(Resolution::Nano),
87            _ => EpochInvalidResolutionSnafu { resolution: s }.fail(),
88        }
89    }
90}
91
92#[derive(Debug)]
93struct Formats(Vec<(Arc<String>, Tz)>);
94
95impl Formats {
96    fn new(mut formats: Vec<(Arc<String>, Tz)>) -> Self {
97        formats.sort_by_key(|(key, _)| key.clone());
98        formats.dedup();
99        Formats(formats)
100    }
101}
102
103impl Default for Formats {
104    fn default() -> Self {
105        Formats(DEFAULT_FORMATS.clone())
106    }
107}
108
109impl std::ops::Deref for Formats {
110    type Target = Vec<(Arc<String>, Tz)>;
111
112    fn deref(&self) -> &Self::Target {
113        &self.0
114    }
115}
116
117/// support string, integer, float, time, epoch
118#[derive(Debug, Default)]
119pub struct TimestampProcessor {
120    fields: Fields,
121    formats: Formats,
122    resolution: Resolution,
123    ignore_missing: bool,
124    // description
125    // if
126    // ignore_failure
127    // on_failure
128    // tag
129}
130
131impl TimestampProcessor {
132    /// try to parse val with timezone first, if failed, parse without timezone
133    fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result<i64> {
134        if let Ok(dt) = DateTime::parse_from_str(val, fmt) {
135            Ok(dt
136                .timestamp_nanos_opt()
137                .context(DateFailedToGetTimestampSnafu)?)
138        } else {
139            let dt = NaiveDateTime::parse_from_str(val, fmt)
140                .context(DateParseSnafu { value: val })?
141                .and_local_timezone(tz)
142                .single()
143                .context(DateFailedToGetLocalTimezoneSnafu)?;
144            Ok(dt
145                .timestamp_nanos_opt()
146                .context(DateFailedToGetTimestampSnafu)?)
147        }
148    }
149
150    fn parse_time_str(&self, val: &str) -> Result<i64> {
151        for (fmt, tz) in self.formats.iter() {
152            if let Ok(ns) = Self::try_parse(val, fmt, *tz) {
153                return Ok(ns);
154            }
155        }
156        ProcessorFailedToParseStringSnafu {
157            kind: PROCESSOR_TIMESTAMP,
158            value: val.to_string(),
159        }
160        .fail()
161    }
162
163    fn parse(&self, val: &Value) -> Result<Timestamp> {
164        let t: i64 = match val {
165            Value::String(s) => {
166                let t = s.parse::<i64>();
167                match t {
168                    Ok(t) => t,
169                    Err(_) => {
170                        let ns = self.parse_time_str(s)?;
171                        return Ok(Timestamp::Nanosecond(ns));
172                    }
173                }
174            }
175            Value::Int16(i) => *i as i64,
176            Value::Int32(i) => *i as i64,
177            Value::Int64(i) => *i,
178            Value::Uint8(i) => *i as i64,
179            Value::Uint16(i) => *i as i64,
180            Value::Uint32(i) => *i as i64,
181            Value::Uint64(i) => *i as i64,
182            Value::Float32(f) => *f as i64,
183            Value::Float64(f) => *f as i64,
184
185            Value::Timestamp(e) => match self.resolution {
186                Resolution::Second => e.timestamp(),
187                Resolution::Milli => e.timestamp_millis(),
188                Resolution::Micro => e.timestamp_micros(),
189                Resolution::Nano => e.timestamp_nanos(),
190            },
191
192            _ => {
193                return ProcessorUnsupportedValueSnafu {
194                    processor: PROCESSOR_TIMESTAMP,
195                    val: val.to_string(),
196                }
197                .fail();
198            }
199        };
200
201        match self.resolution {
202            Resolution::Second => Ok(Timestamp::Second(t)),
203            Resolution::Milli => Ok(Timestamp::Millisecond(t)),
204            Resolution::Micro => Ok(Timestamp::Microsecond(t)),
205            Resolution::Nano => Ok(Timestamp::Nanosecond(t)),
206        }
207    }
208
209    pub(crate) fn target_count(&self) -> usize {
210        self.fields.len()
211    }
212}
213
214fn parse_formats(yaml: &yaml_rust::yaml::Yaml) -> Result<Vec<(Arc<String>, Tz)>> {
215    match yaml.as_vec() {
216        Some(formats_yaml) => {
217            let mut formats = Vec::with_capacity(formats_yaml.len());
218            for v in formats_yaml {
219                let s = yaml_strings(v, FORMATS_NAME)
220                    .or(yaml_string(v, FORMATS_NAME).map(|s| vec![s]))?;
221                if s.len() != 1 && s.len() != 2 {
222                    return DateInvalidFormatSnafu {
223                        processor: PROCESSOR_TIMESTAMP,
224                        s: format!("{s:?}"),
225                    }
226                    .fail();
227                }
228                let mut iter = s.into_iter();
229                // safety: unwrap is safe here
230                let formatter = iter.next().unwrap();
231                let tz = iter
232                    .next()
233                    .map(|tz| {
234                        tz.parse::<Tz>()
235                            .context(DateParseTimezoneSnafu { value: tz })
236                    })
237                    .unwrap_or(Ok(Tz::UTC))?;
238                formats.push((Arc::new(formatter), tz));
239            }
240            Ok(formats)
241        }
242        None => DateInvalidFormatSnafu {
243            processor: PROCESSOR_TIMESTAMP,
244            s: format!("{yaml:?}"),
245        }
246        .fail(),
247    }
248}
249
250impl TryFrom<&yaml_rust::yaml::Hash> for TimestampProcessor {
251    type Error = Error;
252
253    fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
254        let mut fields = Fields::default();
255        let mut formats = Formats::default();
256        let mut resolution = Resolution::default();
257        let mut ignore_missing = false;
258
259        for (k, v) in hash {
260            let key = k
261                .as_str()
262                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
263
264            match key {
265                FIELD_NAME => {
266                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
267                }
268                FIELDS_NAME => {
269                    fields = yaml_new_fields(v, FIELDS_NAME)?;
270                }
271                FORMATS_NAME => {
272                    let formats_vec = parse_formats(v)?;
273                    formats = Formats::new(formats_vec);
274                }
275                RESOLUTION_NAME => {
276                    resolution = yaml_string(v, RESOLUTION_NAME)?.as_str().try_into()?;
277                }
278                IGNORE_MISSING_NAME => {
279                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
280                }
281                _ => {}
282            }
283        }
284
285        let processor_builder = TimestampProcessor {
286            fields,
287            formats,
288            resolution,
289            ignore_missing,
290        };
291
292        Ok(processor_builder)
293    }
294}
295
296impl Processor for TimestampProcessor {
297    fn kind(&self) -> &str {
298        PROCESSOR_TIMESTAMP
299    }
300
301    fn ignore_missing(&self) -> bool {
302        self.ignore_missing
303    }
304
305    fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
306        for field in self.fields.iter() {
307            let index = field.input_field();
308            match val.get(index) {
309                Some(Value::Null) | None => {
310                    if !self.ignore_missing {
311                        return ProcessorMissingFieldSnafu {
312                            processor: self.kind(),
313                            field: field.input_field(),
314                        }
315                        .fail();
316                    }
317                }
318                Some(v) => {
319                    let result = self.parse(v)?;
320                    let output_key = field.target_or_input_field();
321                    val.insert(output_key.to_string(), Value::Timestamp(result));
322                }
323            }
324        }
325        Ok(val)
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use yaml_rust::YamlLoader;
332
333    use super::TimestampProcessor;
334    use crate::etl::value::{Timestamp, Value};
335
336    #[test]
337    fn test_parse_epoch() {
338        let processor_yaml_str = r#"fields:
339  - hello
340resolution: s
341formats:
342  - "%Y-%m-%dT%H:%M:%S%:z"
343  - "%Y-%m-%dT%H:%M:%S%.3f%:z"
344  - "%Y-%m-%dT%H:%M:%S"
345  - "%Y-%m-%dT%H:%M:%SZ"
346"#;
347        let yaml = &YamlLoader::load_from_str(processor_yaml_str).unwrap()[0];
348        let timestamp_yaml = yaml.as_hash().unwrap();
349        let processor = TimestampProcessor::try_from(timestamp_yaml).unwrap();
350
351        let values = [
352            (
353                Value::String("1573840000".into()),
354                Timestamp::Second(1573840000),
355            ),
356            (Value::Int32(1573840001), Timestamp::Second(1573840001)),
357            (Value::Uint64(1573840002), Timestamp::Second(1573840002)),
358            // float32 has a problem expressing the timestamp.
359            // 1573840003.0_f32 as i64 is 1573840000
360            //(Value::Float32(1573840003.0), Epoch::Second(1573840003)),
361            (
362                Value::String("2019-11-15T17:46:40Z".into()),
363                Timestamp::Nanosecond(1573840000000000000),
364            ),
365        ];
366
367        for (value, result) in values {
368            let parsed = processor.parse(&value).unwrap();
369            assert_eq!(parsed, result);
370        }
371        let values: Vec<&str> = vec![
372            "2014-5-17T12:34:56",
373            "2014-5-17T12:34:56Z",
374            "2014-5-17T12:34:56+09:30",
375            "2014-5-17T12:34:56.000+09:30",
376            "2014-5-17T12:34:56-0930",
377            "2014-5-17T12:34:56.000-0930",
378        ]
379        .into_iter()
380        .collect();
381
382        for value in values {
383            let parsed = processor.parse(&Value::String(value.into()));
384            assert!(parsed.is_ok());
385        }
386    }
387
388    #[test]
389    fn test_parse_with_timezone() {
390        let processor_yaml_str = r#"fields:
391  - hello
392resolution: s
393formats:
394  - ["%Y-%m-%dT%H:%M:%S%:z", "Asia/Tokyo"]
395  - ["%Y-%m-%dT%H:%M:%S%.3f%:z", "Asia/Tokyo"]
396  - ["%Y-%m-%dT%H:%M:%S", "Asia/Tokyo"]
397  - ["%Y-%m-%dT%H:%M:%SZ", "Asia/Tokyo"]
398"#;
399        let yaml = &YamlLoader::load_from_str(processor_yaml_str).unwrap()[0];
400        let timestamp_yaml = yaml.as_hash().unwrap();
401        let processor = TimestampProcessor::try_from(timestamp_yaml).unwrap();
402
403        let values: Vec<&str> = vec![
404            "2014-5-17T12:34:56",
405            "2014-5-17T12:34:56Z",
406            "2014-5-17T12:34:56+09:30",
407            "2014-5-17T12:34:56.000+09:30",
408            "2014-5-17T12:34:56-0930",
409            "2014-5-17T12:34:56.000-0930",
410        ]
411        .into_iter()
412        .collect();
413
414        for value in values {
415            let parsed = processor.parse(&Value::String(value.into()));
416            assert!(parsed.is_ok());
417        }
418    }
419}