pipeline/etl/processor/
date.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use chrono::{DateTime, NaiveDateTime};
18use chrono_tz::Tz;
19use lazy_static::lazy_static;
20use snafu::{OptionExt, ResultExt};
21
22use crate::error::{
23    DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateParseSnafu,
24    DateParseTimezoneSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
25    ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu, Result,
26};
27use crate::etl::field::Fields;
28use crate::etl::processor::{
29    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, FIELDS_NAME,
30    FIELD_NAME, IGNORE_MISSING_NAME,
31};
32use crate::etl::value::{Timestamp, Value};
33
34pub(crate) const PROCESSOR_DATE: &str = "date";
35
36const FORMATS_NAME: &str = "formats"; // default RFC3339
37const TIMEZONE_NAME: &str = "timezone"; // default UTC
38const LOCALE_NAME: &str = "locale";
39const OUTPUT_FORMAT_NAME: &str = "output_format"; // default with input format
40
41lazy_static! {
42    static ref DEFAULT_FORMATS: Vec<Arc<String>> = vec![
43                    // timezone with colon
44                    "%Y-%m-%dT%H:%M:%S%:z",
45                    "%Y-%m-%dT%H:%M:%S%.3f%:z",
46                    "%Y-%m-%dT%H:%M:%S%.6f%:z",
47                    "%Y-%m-%dT%H:%M:%S%.9f%:z",
48                    // timezone without colon
49                    "%Y-%m-%dT%H:%M:%S%z",
50                    "%Y-%m-%dT%H:%M:%S%.3f%z",
51                    "%Y-%m-%dT%H:%M:%S%.6f%z",
52                    "%Y-%m-%dT%H:%M:%S%.9f%z",
53                    // without timezone
54                    "%Y-%m-%dT%H:%M:%SZ",
55                    "%Y-%m-%dT%H:%M:%S",
56                    "%Y-%m-%dT%H:%M:%S%.3f",
57                    "%Y-%m-%dT%H:%M:%S%.6f",
58                    "%Y-%m-%dT%H:%M:%S%.9f",
59                ]
60                .iter()
61                .map(|s| Arc::new(s.to_string()))
62                .collect();
63}
64
65#[derive(Debug)]
66struct Formats(Vec<Arc<String>>);
67
68impl Default for Formats {
69    fn default() -> Self {
70        Formats(DEFAULT_FORMATS.clone())
71    }
72}
73
74impl Formats {
75    fn new(mut formats: Vec<Arc<String>>) -> Self {
76        formats.sort();
77        formats.dedup();
78        Formats(formats)
79    }
80}
81
82impl std::ops::Deref for Formats {
83    type Target = Vec<Arc<String>>;
84
85    fn deref(&self) -> &Self::Target {
86        &self.0
87    }
88}
89
90impl TryFrom<&yaml_rust::yaml::Hash> for DateProcessor {
91    type Error = Error;
92
93    fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
94        let mut fields = Fields::default();
95        let mut formats = Formats::default();
96        let mut timezone = None;
97        let mut locale = None;
98        let mut ignore_missing = false;
99
100        for (k, v) in hash {
101            let key = k
102                .as_str()
103                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
104
105            match key {
106                FIELD_NAME => {
107                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
108                }
109                FIELDS_NAME => {
110                    fields = yaml_new_fields(v, FIELDS_NAME)?;
111                }
112
113                FORMATS_NAME => {
114                    let format_strs = yaml_strings(v, FORMATS_NAME)?;
115                    if format_strs.is_empty() {
116                        formats = Formats::new(DEFAULT_FORMATS.clone());
117                    } else {
118                        formats = Formats::new(format_strs.into_iter().map(Arc::new).collect());
119                    }
120                }
121                TIMEZONE_NAME => {
122                    timezone = Some(Arc::new(yaml_string(v, TIMEZONE_NAME)?));
123                }
124                LOCALE_NAME => {
125                    locale = Some(Arc::new(yaml_string(v, LOCALE_NAME)?));
126                }
127                IGNORE_MISSING_NAME => {
128                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
129                }
130
131                _ => {}
132            }
133        }
134
135        let builder = DateProcessor {
136            fields,
137            formats,
138            timezone,
139            locale,
140            ignore_missing,
141        };
142
143        Ok(builder)
144    }
145}
146
147/// deprecated it should be removed in the future
148/// Reserved for compatibility only
149#[derive(Debug, Default)]
150pub struct DateProcessor {
151    pub(crate) fields: Fields,
152    formats: Formats,
153    timezone: Option<Arc<String>>,
154    locale: Option<Arc<String>>, // to support locale
155
156    ignore_missing: bool,
157    // description
158    // if
159    // ignore_failure
160    // on_failure
161    // tag
162}
163
164impl DateProcessor {
165    fn parse(&self, val: &str) -> Result<Timestamp> {
166        let mut tz = Tz::UTC;
167        if let Some(timezone) = &self.timezone {
168            tz = timezone.parse::<Tz>().context(DateParseTimezoneSnafu {
169                value: timezone.as_ref(),
170            })?;
171        }
172
173        for fmt in self.formats.iter() {
174            if let Ok(ns) = try_parse(val, fmt, tz) {
175                return Ok(Timestamp::Nanosecond(ns));
176            }
177        }
178
179        ProcessorFailedToParseStringSnafu {
180            kind: PROCESSOR_DATE.to_string(),
181            value: val.to_string(),
182        }
183        .fail()
184    }
185}
186
187impl Processor for DateProcessor {
188    fn kind(&self) -> &str {
189        PROCESSOR_DATE
190    }
191
192    fn ignore_missing(&self) -> bool {
193        self.ignore_missing
194    }
195
196    fn exec_mut(&self, mut val: Value) -> Result<Value> {
197        for field in self.fields.iter() {
198            let index = field.input_field();
199            match val.get(index) {
200                Some(Value::String(s)) => {
201                    let timestamp = self.parse(s)?;
202                    let output_key = field.target_or_input_field();
203                    val.insert(output_key.to_string(), Value::Timestamp(timestamp))?;
204                }
205                Some(Value::Null) | None => {
206                    if !self.ignore_missing {
207                        return ProcessorMissingFieldSnafu {
208                            processor: self.kind().to_string(),
209                            field: field.input_field().to_string(),
210                        }
211                        .fail();
212                    }
213                }
214                Some(v) => {
215                    return ProcessorExpectStringSnafu {
216                        processor: self.kind().to_string(),
217                        v: v.clone(),
218                    }
219                    .fail();
220                }
221            }
222        }
223        Ok(val)
224    }
225}
226
227/// try to parse val with timezone first, if failed, parse without timezone
228fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result<i64> {
229    if let Ok(dt) = DateTime::parse_from_str(val, fmt) {
230        Ok(dt
231            .timestamp_nanos_opt()
232            .context(DateFailedToGetTimestampSnafu)?)
233    } else {
234        let dt = NaiveDateTime::parse_from_str(val, fmt)
235            .context(DateParseSnafu { value: val })?
236            .and_local_timezone(tz)
237            .single()
238            .context(DateFailedToGetLocalTimezoneSnafu)?;
239        Ok(dt
240            .timestamp_nanos_opt()
241            .context(DateFailedToGetTimestampSnafu)?)
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use std::sync::Arc;
248
249    use chrono_tz::Asia::Tokyo;
250
251    use crate::etl::processor::date::{try_parse, DateProcessor};
252
253    #[test]
254    fn test_try_parse() {
255        let time_with_tz = "2014-5-17T04:34:56+00:00";
256        let fmt_with_tz = "%Y-%m-%dT%H:%M:%S%:z";
257
258        let time_without_tz = "2014-5-17T13:34:56";
259        let fmt_without_tz = "%Y-%m-%dT%H:%M:%S";
260
261        let tz = Tokyo;
262
263        let parsed_with_tz = try_parse(time_with_tz, fmt_with_tz, tz);
264        assert!(parsed_with_tz.is_ok());
265
266        let parsed_without_tz = try_parse(time_without_tz, fmt_without_tz, tz);
267        assert!(parsed_without_tz.is_ok());
268
269        assert_eq!(parsed_with_tz.unwrap(), parsed_without_tz.unwrap());
270    }
271
272    #[test]
273    fn test_parse() {
274        let processor = DateProcessor::default();
275
276        let values: Vec<&str> = vec![
277            "2014-5-17T12:34:56",
278            "2014-5-17T12:34:56Z",
279            "2014-5-17T12:34:56+09:30",
280            "2014-5-17T12:34:56.000+09:30",
281            "2014-5-17T12:34:56-0930",
282            "2014-5-17T12:34:56.000-0930",
283        ]
284        .into_iter()
285        .collect();
286
287        for value in values {
288            let parsed = processor.parse(value);
289            assert!(parsed.is_ok());
290        }
291    }
292
293    #[test]
294    fn test_parse_with_formats() {
295        let formats = vec![
296            "%Y-%m-%dT%H:%M:%S%:z",
297            "%Y-%m-%dT%H:%M:%S%.3f%:z",
298            "%Y-%m-%dT%H:%M:%S",
299            "%Y-%m-%dT%H:%M:%SZ",
300        ]
301        .into_iter()
302        .map(|s| Arc::new(s.to_string()))
303        .collect::<Vec<_>>();
304        let processor = DateProcessor {
305            formats: super::Formats(formats),
306            ..Default::default()
307        };
308
309        let values: Vec<&str> = vec![
310            "2014-5-17T12:34:56",
311            "2014-5-17T12:34:56Z",
312            "2014-5-17T12:34:56+09:30",
313            "2014-5-17T12:34:56.000+09:30",
314            "2014-5-17T12:34:56-0930",
315            "2014-5-17T12:34:56.000-0930",
316        ]
317        .into_iter()
318        .collect();
319
320        for value in values {
321            let parsed = processor.parse(value);
322            assert!(parsed.is_ok());
323        }
324    }
325
326    #[test]
327    fn test_parse_with_timezone() {
328        let processor = DateProcessor {
329            timezone: Some(Arc::new("Asia/Tokyo".to_string())),
330            ..Default::default()
331        };
332
333        let values: Vec<&str> = vec![
334            "2014-5-17T12:34:56",
335            "2014-5-17T12:34:56Z",
336            "2014-5-17T12:34:56+09:30",
337            "2014-5-17T12:34:56.000+09:30",
338            "2014-5-17T12:34:56-0930",
339            "2014-5-17T12:34:56.000-0930",
340        ]
341        .into_iter()
342        .collect();
343
344        for value in values {
345            let parsed = processor.parse(value);
346            assert!(parsed.is_ok());
347        }
348    }
349}