pipeline/etl/processor/
date.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use chrono::{DateTime, NaiveDateTime, Utc};
18use chrono_tz::Tz;
19use lazy_static::lazy_static;
20use snafu::{OptionExt, ResultExt};
21use vrl::value::{KeyString, Value as VrlValue};
22
23use crate::error::{
24    DateFailedToGetLocalTimezoneSnafu, DateParseSnafu, DateParseTimezoneSnafu, Error,
25    KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorFailedToParseStringSnafu,
26    ProcessorMissingFieldSnafu, Result, ValueMustBeMapSnafu,
27};
28use crate::etl::field::Fields;
29use crate::etl::processor::{
30    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, FIELDS_NAME,
31    FIELD_NAME, IGNORE_MISSING_NAME,
32};
33
34pub(crate) const PROCESSOR_DATE: &str = "date";
35
36const FORMATS_NAME: &str = "formats"; // default RFC3339
37const TIMEZONE_NAME: &str = "timezone"; // default UTC
38const LOCALE_NAME: &str = "locale";
39const OUTPUT_FORMAT_NAME: &str = "output_format"; // default with input format
40
41lazy_static! {
42    static ref DEFAULT_FORMATS: Vec<Arc<String>> = vec![
43                    // timezone with colon
44                    "%Y-%m-%dT%H:%M:%S%:z",
45                    "%Y-%m-%dT%H:%M:%S%.3f%:z",
46                    "%Y-%m-%dT%H:%M:%S%.6f%:z",
47                    "%Y-%m-%dT%H:%M:%S%.9f%:z",
48                    // timezone without colon
49                    "%Y-%m-%dT%H:%M:%S%z",
50                    "%Y-%m-%dT%H:%M:%S%.3f%z",
51                    "%Y-%m-%dT%H:%M:%S%.6f%z",
52                    "%Y-%m-%dT%H:%M:%S%.9f%z",
53                    // without timezone
54                    "%Y-%m-%dT%H:%M:%SZ",
55                    "%Y-%m-%dT%H:%M:%S",
56                    "%Y-%m-%dT%H:%M:%S%.3f",
57                    "%Y-%m-%dT%H:%M:%S%.6f",
58                    "%Y-%m-%dT%H:%M:%S%.9f",
59                ]
60                .iter()
61                .map(|s| Arc::new(s.to_string()))
62                .collect();
63}
64
65#[derive(Debug)]
66struct Formats(Vec<Arc<String>>);
67
68impl Default for Formats {
69    fn default() -> Self {
70        Formats(DEFAULT_FORMATS.clone())
71    }
72}
73
74impl Formats {
75    fn new(mut formats: Vec<Arc<String>>) -> Self {
76        formats.sort();
77        formats.dedup();
78        Formats(formats)
79    }
80}
81
82impl std::ops::Deref for Formats {
83    type Target = Vec<Arc<String>>;
84
85    fn deref(&self) -> &Self::Target {
86        &self.0
87    }
88}
89
90impl TryFrom<&yaml_rust::yaml::Hash> for DateProcessor {
91    type Error = Error;
92
93    fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
94        let mut fields = Fields::default();
95        let mut formats = Formats::default();
96        let mut timezone = None;
97        let mut locale = None;
98        let mut ignore_missing = false;
99
100        for (k, v) in hash {
101            let key = k
102                .as_str()
103                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
104
105            match key {
106                FIELD_NAME => {
107                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
108                }
109                FIELDS_NAME => {
110                    fields = yaml_new_fields(v, FIELDS_NAME)?;
111                }
112
113                FORMATS_NAME => {
114                    let format_strs = yaml_strings(v, FORMATS_NAME)?;
115                    if format_strs.is_empty() {
116                        formats = Formats::new(DEFAULT_FORMATS.clone());
117                    } else {
118                        formats = Formats::new(format_strs.into_iter().map(Arc::new).collect());
119                    }
120                }
121                TIMEZONE_NAME => {
122                    timezone = Some(Arc::new(yaml_string(v, TIMEZONE_NAME)?));
123                }
124                LOCALE_NAME => {
125                    locale = Some(Arc::new(yaml_string(v, LOCALE_NAME)?));
126                }
127                IGNORE_MISSING_NAME => {
128                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
129                }
130
131                _ => {}
132            }
133        }
134
135        let builder = DateProcessor {
136            fields,
137            formats,
138            timezone,
139            locale,
140            ignore_missing,
141        };
142
143        Ok(builder)
144    }
145}
146
147/// deprecated it should be removed in the future
148/// Reserved for compatibility only
149#[derive(Debug, Default)]
150pub struct DateProcessor {
151    pub(crate) fields: Fields,
152    formats: Formats,
153    timezone: Option<Arc<String>>,
154    locale: Option<Arc<String>>, // to support locale
155
156    ignore_missing: bool,
157    // description
158    // if
159    // ignore_failure
160    // on_failure
161    // tag
162}
163
164impl DateProcessor {
165    fn parse(&self, val: &str) -> Result<DateTime<Utc>> {
166        let mut tz = Tz::UTC;
167        if let Some(timezone) = &self.timezone {
168            tz = timezone.parse::<Tz>().context(DateParseTimezoneSnafu {
169                value: timezone.as_ref(),
170            })?;
171        }
172
173        for fmt in self.formats.iter() {
174            if let Ok(utc_ts) = try_parse(val, fmt, tz) {
175                return Ok(utc_ts);
176            }
177        }
178
179        ProcessorFailedToParseStringSnafu {
180            kind: PROCESSOR_DATE.to_string(),
181            value: val.to_string(),
182        }
183        .fail()
184    }
185}
186
187impl Processor for DateProcessor {
188    fn kind(&self) -> &str {
189        PROCESSOR_DATE
190    }
191
192    fn ignore_missing(&self) -> bool {
193        self.ignore_missing
194    }
195
196    fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
197        for field in self.fields.iter() {
198            let index = field.input_field();
199
200            let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
201
202            match val.get(index) {
203                Some(VrlValue::Bytes(s)) => {
204                    let timestamp = self.parse(String::from_utf8_lossy(s).as_ref())?;
205                    let output_key = field.target_or_input_field();
206                    val.insert(KeyString::from(output_key), VrlValue::Timestamp(timestamp));
207                }
208                Some(VrlValue::Null) | None => {
209                    if !self.ignore_missing {
210                        return ProcessorMissingFieldSnafu {
211                            processor: self.kind().to_string(),
212                            field: field.input_field().to_string(),
213                        }
214                        .fail();
215                    }
216                }
217                Some(v) => {
218                    return ProcessorExpectStringSnafu {
219                        processor: self.kind().to_string(),
220                        v: v.clone(),
221                    }
222                    .fail();
223                }
224            }
225        }
226        Ok(val)
227    }
228}
229
230// parse the datetime with timezone info
231// if failed, try to parse using naive date time and add tz info
232// finally convert the datetime to utc
233fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result<DateTime<Utc>> {
234    if let Ok(dt) = DateTime::parse_from_str(val, fmt) {
235        Ok(dt.to_utc())
236    } else {
237        let dt = NaiveDateTime::parse_from_str(val, fmt)
238            .context(DateParseSnafu { value: val })?
239            .and_local_timezone(tz)
240            .single()
241            .context(DateFailedToGetLocalTimezoneSnafu)?;
242        Ok(dt.to_utc())
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use std::sync::Arc;
249
250    use chrono_tz::Asia::Tokyo;
251
252    use crate::etl::processor::date::{try_parse, DateProcessor};
253
254    #[test]
255    fn test_try_parse() {
256        let time_with_tz = "2014-5-17T04:34:56+00:00";
257        let fmt_with_tz = "%Y-%m-%dT%H:%M:%S%:z";
258
259        let time_without_tz = "2014-5-17T13:34:56";
260        let fmt_without_tz = "%Y-%m-%dT%H:%M:%S";
261
262        let tz = Tokyo;
263
264        let parsed_with_tz = try_parse(time_with_tz, fmt_with_tz, tz);
265        assert!(parsed_with_tz.is_ok());
266
267        let parsed_without_tz = try_parse(time_without_tz, fmt_without_tz, tz);
268        assert!(parsed_without_tz.is_ok());
269
270        assert_eq!(parsed_with_tz.unwrap(), parsed_without_tz.unwrap());
271    }
272
273    #[test]
274    fn test_parse() {
275        let processor = DateProcessor::default();
276
277        let values: Vec<&str> = vec![
278            "2014-5-17T12:34:56",
279            "2014-5-17T12:34:56Z",
280            "2014-5-17T12:34:56+09:30",
281            "2014-5-17T12:34:56.000+09:30",
282            "2014-5-17T12:34:56-0930",
283            "2014-5-17T12:34:56.000-0930",
284        ]
285        .into_iter()
286        .collect();
287
288        for value in values {
289            let parsed = processor.parse(value);
290            assert!(parsed.is_ok());
291        }
292    }
293
294    #[test]
295    fn test_parse_with_formats() {
296        let formats = vec![
297            "%Y-%m-%dT%H:%M:%S%:z",
298            "%Y-%m-%dT%H:%M:%S%.3f%:z",
299            "%Y-%m-%dT%H:%M:%S",
300            "%Y-%m-%dT%H:%M:%SZ",
301        ]
302        .into_iter()
303        .map(|s| Arc::new(s.to_string()))
304        .collect::<Vec<_>>();
305        let processor = DateProcessor {
306            formats: super::Formats(formats),
307            ..Default::default()
308        };
309
310        let values: Vec<&str> = vec![
311            "2014-5-17T12:34:56",
312            "2014-5-17T12:34:56Z",
313            "2014-5-17T12:34:56+09:30",
314            "2014-5-17T12:34:56.000+09:30",
315            "2014-5-17T12:34:56-0930",
316            "2014-5-17T12:34:56.000-0930",
317        ]
318        .into_iter()
319        .collect();
320
321        for value in values {
322            let parsed = processor.parse(value);
323            assert!(parsed.is_ok());
324        }
325    }
326
327    #[test]
328    fn test_parse_with_timezone() {
329        let processor = DateProcessor {
330            timezone: Some(Arc::new("Asia/Tokyo".to_string())),
331            ..Default::default()
332        };
333
334        let values: Vec<&str> = vec![
335            "2014-5-17T12:34:56",
336            "2014-5-17T12:34:56Z",
337            "2014-5-17T12:34:56+09:30",
338            "2014-5-17T12:34:56.000+09:30",
339            "2014-5-17T12:34:56-0930",
340            "2014-5-17T12:34:56.000-0930",
341        ]
342        .into_iter()
343        .collect();
344
345        for value in values {
346            let parsed = processor.parse(value);
347            assert!(parsed.is_ok());
348        }
349    }
350}