pipeline/etl/processor/
date.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use chrono::{DateTime, NaiveDateTime};
18use chrono_tz::Tz;
19use lazy_static::lazy_static;
20use snafu::{OptionExt, ResultExt};
21
22use crate::error::{
23    DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateParseSnafu,
24    DateParseTimezoneSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
25    ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu, Result,
26};
27use crate::etl::field::Fields;
28use crate::etl::processor::{
29    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, FIELDS_NAME,
30    FIELD_NAME, IGNORE_MISSING_NAME,
31};
32use crate::etl::value::{Timestamp, Value};
33use crate::etl::PipelineMap;
34
35pub(crate) const PROCESSOR_DATE: &str = "date";
36
37const FORMATS_NAME: &str = "formats"; // default RFC3339
38const TIMEZONE_NAME: &str = "timezone"; // default UTC
39const LOCALE_NAME: &str = "locale";
40const OUTPUT_FORMAT_NAME: &str = "output_format"; // default with input format
41
42lazy_static! {
43    static ref DEFAULT_FORMATS: Vec<Arc<String>> = vec![
44                    // timezone with colon
45                    "%Y-%m-%dT%H:%M:%S%:z",
46                    "%Y-%m-%dT%H:%M:%S%.3f%:z",
47                    "%Y-%m-%dT%H:%M:%S%.6f%:z",
48                    "%Y-%m-%dT%H:%M:%S%.9f%:z",
49                    // timezone without colon
50                    "%Y-%m-%dT%H:%M:%S%z",
51                    "%Y-%m-%dT%H:%M:%S%.3f%z",
52                    "%Y-%m-%dT%H:%M:%S%.6f%z",
53                    "%Y-%m-%dT%H:%M:%S%.9f%z",
54                    // without timezone
55                    "%Y-%m-%dT%H:%M:%SZ",
56                    "%Y-%m-%dT%H:%M:%S",
57                    "%Y-%m-%dT%H:%M:%S%.3f",
58                    "%Y-%m-%dT%H:%M:%S%.6f",
59                    "%Y-%m-%dT%H:%M:%S%.9f",
60                ]
61                .iter()
62                .map(|s| Arc::new(s.to_string()))
63                .collect();
64}
65
66#[derive(Debug)]
67struct Formats(Vec<Arc<String>>);
68
69impl Default for Formats {
70    fn default() -> Self {
71        Formats(DEFAULT_FORMATS.clone())
72    }
73}
74
75impl Formats {
76    fn new(mut formats: Vec<Arc<String>>) -> Self {
77        formats.sort();
78        formats.dedup();
79        Formats(formats)
80    }
81}
82
83impl std::ops::Deref for Formats {
84    type Target = Vec<Arc<String>>;
85
86    fn deref(&self) -> &Self::Target {
87        &self.0
88    }
89}
90
91impl TryFrom<&yaml_rust::yaml::Hash> for DateProcessor {
92    type Error = Error;
93
94    fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
95        let mut fields = Fields::default();
96        let mut formats = Formats::default();
97        let mut timezone = None;
98        let mut locale = None;
99        let mut ignore_missing = false;
100
101        for (k, v) in hash {
102            let key = k
103                .as_str()
104                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
105
106            match key {
107                FIELD_NAME => {
108                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
109                }
110                FIELDS_NAME => {
111                    fields = yaml_new_fields(v, FIELDS_NAME)?;
112                }
113
114                FORMATS_NAME => {
115                    let format_strs = yaml_strings(v, FORMATS_NAME)?;
116                    if format_strs.is_empty() {
117                        formats = Formats::new(DEFAULT_FORMATS.clone());
118                    } else {
119                        formats = Formats::new(format_strs.into_iter().map(Arc::new).collect());
120                    }
121                }
122                TIMEZONE_NAME => {
123                    timezone = Some(Arc::new(yaml_string(v, TIMEZONE_NAME)?));
124                }
125                LOCALE_NAME => {
126                    locale = Some(Arc::new(yaml_string(v, LOCALE_NAME)?));
127                }
128                IGNORE_MISSING_NAME => {
129                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
130                }
131
132                _ => {}
133            }
134        }
135
136        let builder = DateProcessor {
137            fields,
138            formats,
139            timezone,
140            locale,
141            ignore_missing,
142        };
143
144        Ok(builder)
145    }
146}
147
148/// deprecated it should be removed in the future
149/// Reserved for compatibility only
150#[derive(Debug, Default)]
151pub struct DateProcessor {
152    fields: Fields,
153    formats: Formats,
154    timezone: Option<Arc<String>>,
155    locale: Option<Arc<String>>, // to support locale
156
157    ignore_missing: bool,
158    // description
159    // if
160    // ignore_failure
161    // on_failure
162    // tag
163}
164
165impl DateProcessor {
166    fn parse(&self, val: &str) -> Result<Timestamp> {
167        let mut tz = Tz::UTC;
168        if let Some(timezone) = &self.timezone {
169            tz = timezone.parse::<Tz>().context(DateParseTimezoneSnafu {
170                value: timezone.as_ref(),
171            })?;
172        }
173
174        for fmt in self.formats.iter() {
175            if let Ok(ns) = try_parse(val, fmt, tz) {
176                return Ok(Timestamp::Nanosecond(ns));
177            }
178        }
179
180        ProcessorFailedToParseStringSnafu {
181            kind: PROCESSOR_DATE.to_string(),
182            value: val.to_string(),
183        }
184        .fail()
185    }
186}
187
188impl Processor for DateProcessor {
189    fn kind(&self) -> &str {
190        PROCESSOR_DATE
191    }
192
193    fn ignore_missing(&self) -> bool {
194        self.ignore_missing
195    }
196
197    fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
198        for field in self.fields.iter() {
199            let index = field.input_field();
200            match val.get(index) {
201                Some(Value::String(s)) => {
202                    let timestamp = self.parse(s)?;
203                    let output_key = field.target_or_input_field();
204                    val.insert(output_key.to_string(), Value::Timestamp(timestamp));
205                }
206                Some(Value::Null) | None => {
207                    if !self.ignore_missing {
208                        return ProcessorMissingFieldSnafu {
209                            processor: self.kind().to_string(),
210                            field: field.input_field().to_string(),
211                        }
212                        .fail();
213                    }
214                }
215                Some(v) => {
216                    return ProcessorExpectStringSnafu {
217                        processor: self.kind().to_string(),
218                        v: v.clone(),
219                    }
220                    .fail();
221                }
222            }
223        }
224        Ok(())
225    }
226}
227
228/// try to parse val with timezone first, if failed, parse without timezone
229fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result<i64> {
230    if let Ok(dt) = DateTime::parse_from_str(val, fmt) {
231        Ok(dt
232            .timestamp_nanos_opt()
233            .context(DateFailedToGetTimestampSnafu)?)
234    } else {
235        let dt = NaiveDateTime::parse_from_str(val, fmt)
236            .context(DateParseSnafu { value: val })?
237            .and_local_timezone(tz)
238            .single()
239            .context(DateFailedToGetLocalTimezoneSnafu)?;
240        Ok(dt
241            .timestamp_nanos_opt()
242            .context(DateFailedToGetTimestampSnafu)?)
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use std::sync::Arc;
249
250    use chrono_tz::Asia::Tokyo;
251
252    use crate::etl::processor::date::{try_parse, DateProcessor};
253
254    #[test]
255    fn test_try_parse() {
256        let time_with_tz = "2014-5-17T04:34:56+00:00";
257        let fmt_with_tz = "%Y-%m-%dT%H:%M:%S%:z";
258
259        let time_without_tz = "2014-5-17T13:34:56";
260        let fmt_without_tz = "%Y-%m-%dT%H:%M:%S";
261
262        let tz = Tokyo;
263
264        let parsed_with_tz = try_parse(time_with_tz, fmt_with_tz, tz);
265        assert!(parsed_with_tz.is_ok());
266
267        let parsed_without_tz = try_parse(time_without_tz, fmt_without_tz, tz);
268        assert!(parsed_without_tz.is_ok());
269
270        assert_eq!(parsed_with_tz.unwrap(), parsed_without_tz.unwrap());
271    }
272
273    #[test]
274    fn test_parse() {
275        let processor = DateProcessor::default();
276
277        let values: Vec<&str> = vec![
278            "2014-5-17T12:34:56",
279            "2014-5-17T12:34:56Z",
280            "2014-5-17T12:34:56+09:30",
281            "2014-5-17T12:34:56.000+09:30",
282            "2014-5-17T12:34:56-0930",
283            "2014-5-17T12:34:56.000-0930",
284        ]
285        .into_iter()
286        .collect();
287
288        for value in values {
289            let parsed = processor.parse(value);
290            assert!(parsed.is_ok());
291        }
292    }
293
294    #[test]
295    fn test_parse_with_formats() {
296        let formats = vec![
297            "%Y-%m-%dT%H:%M:%S%:z",
298            "%Y-%m-%dT%H:%M:%S%.3f%:z",
299            "%Y-%m-%dT%H:%M:%S",
300            "%Y-%m-%dT%H:%M:%SZ",
301        ]
302        .into_iter()
303        .map(|s| Arc::new(s.to_string()))
304        .collect::<Vec<_>>();
305        let processor = DateProcessor {
306            formats: super::Formats(formats),
307            ..Default::default()
308        };
309
310        let values: Vec<&str> = vec![
311            "2014-5-17T12:34:56",
312            "2014-5-17T12:34:56Z",
313            "2014-5-17T12:34:56+09:30",
314            "2014-5-17T12:34:56.000+09:30",
315            "2014-5-17T12:34:56-0930",
316            "2014-5-17T12:34:56.000-0930",
317        ]
318        .into_iter()
319        .collect();
320
321        for value in values {
322            let parsed = processor.parse(value);
323            assert!(parsed.is_ok());
324        }
325    }
326
327    #[test]
328    fn test_parse_with_timezone() {
329        let processor = DateProcessor {
330            timezone: Some(Arc::new("Asia/Tokyo".to_string())),
331            ..Default::default()
332        };
333
334        let values: Vec<&str> = vec![
335            "2014-5-17T12:34:56",
336            "2014-5-17T12:34:56Z",
337            "2014-5-17T12:34:56+09:30",
338            "2014-5-17T12:34:56.000+09:30",
339            "2014-5-17T12:34:56-0930",
340            "2014-5-17T12:34:56.000-0930",
341        ]
342        .into_iter()
343        .collect();
344
345        for value in values {
346            let parsed = processor.parse(value);
347            assert!(parsed.is_ok());
348        }
349    }
350}