pipeline/etl/processor/
date.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use chrono::{DateTime, NaiveDateTime};
18use chrono_tz::Tz;
19use lazy_static::lazy_static;
20use snafu::{OptionExt, ResultExt};
21
22use crate::error::{
23    DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateParseSnafu,
24    DateParseTimezoneSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
25    ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu, Result,
26};
27use crate::etl::field::Fields;
28use crate::etl::processor::{
29    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, yaml_strings, Processor, FIELDS_NAME,
30    FIELD_NAME, IGNORE_MISSING_NAME,
31};
32use crate::etl::value::{Timestamp, Value};
33use crate::etl::PipelineMap;
34
35pub(crate) const PROCESSOR_DATE: &str = "date";
36
37const FORMATS_NAME: &str = "formats"; // default RFC3339
38const TIMEZONE_NAME: &str = "timezone"; // default UTC
39const LOCALE_NAME: &str = "locale";
40const OUTPUT_FORMAT_NAME: &str = "output_format"; // default with input format
41
42lazy_static! {
43    static ref DEFAULT_FORMATS: Vec<Arc<String>> = vec![
44                    // timezone with colon
45                    "%Y-%m-%dT%H:%M:%S%:z",
46                    "%Y-%m-%dT%H:%M:%S%.3f%:z",
47                    "%Y-%m-%dT%H:%M:%S%.6f%:z",
48                    "%Y-%m-%dT%H:%M:%S%.9f%:z",
49                    // timezone without colon
50                    "%Y-%m-%dT%H:%M:%S%z",
51                    "%Y-%m-%dT%H:%M:%S%.3f%z",
52                    "%Y-%m-%dT%H:%M:%S%.6f%z",
53                    "%Y-%m-%dT%H:%M:%S%.9f%z",
54                    // without timezone
55                    "%Y-%m-%dT%H:%M:%SZ",
56                    "%Y-%m-%dT%H:%M:%S",
57                    "%Y-%m-%dT%H:%M:%S%.3f",
58                    "%Y-%m-%dT%H:%M:%S%.6f",
59                    "%Y-%m-%dT%H:%M:%S%.9f",
60                ]
61                .iter()
62                .map(|s| Arc::new(s.to_string()))
63                .collect();
64}
65
66#[derive(Debug)]
67struct Formats(Vec<Arc<String>>);
68
69impl Default for Formats {
70    fn default() -> Self {
71        Formats(DEFAULT_FORMATS.clone())
72    }
73}
74
75impl Formats {
76    fn new(mut formats: Vec<Arc<String>>) -> Self {
77        formats.sort();
78        formats.dedup();
79        Formats(formats)
80    }
81}
82
83impl std::ops::Deref for Formats {
84    type Target = Vec<Arc<String>>;
85
86    fn deref(&self) -> &Self::Target {
87        &self.0
88    }
89}
90
91impl TryFrom<&yaml_rust::yaml::Hash> for DateProcessor {
92    type Error = Error;
93
94    fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
95        let mut fields = Fields::default();
96        let mut formats = Formats::default();
97        let mut timezone = None;
98        let mut locale = None;
99        let mut ignore_missing = false;
100
101        for (k, v) in hash {
102            let key = k
103                .as_str()
104                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
105
106            match key {
107                FIELD_NAME => {
108                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
109                }
110                FIELDS_NAME => {
111                    fields = yaml_new_fields(v, FIELDS_NAME)?;
112                }
113
114                FORMATS_NAME => {
115                    let format_strs = yaml_strings(v, FORMATS_NAME)?;
116                    if format_strs.is_empty() {
117                        formats = Formats::new(DEFAULT_FORMATS.clone());
118                    } else {
119                        formats = Formats::new(format_strs.into_iter().map(Arc::new).collect());
120                    }
121                }
122                TIMEZONE_NAME => {
123                    timezone = Some(Arc::new(yaml_string(v, TIMEZONE_NAME)?));
124                }
125                LOCALE_NAME => {
126                    locale = Some(Arc::new(yaml_string(v, LOCALE_NAME)?));
127                }
128                IGNORE_MISSING_NAME => {
129                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
130                }
131
132                _ => {}
133            }
134        }
135
136        let builder = DateProcessor {
137            fields,
138            formats,
139            timezone,
140            locale,
141            ignore_missing,
142        };
143
144        Ok(builder)
145    }
146}
147
148/// deprecated it should be removed in the future
149/// Reserved for compatibility only
150#[derive(Debug, Default)]
151pub struct DateProcessor {
152    fields: Fields,
153    formats: Formats,
154    timezone: Option<Arc<String>>,
155    locale: Option<Arc<String>>, // to support locale
156
157    ignore_missing: bool,
158    // description
159    // if
160    // ignore_failure
161    // on_failure
162    // tag
163}
164
165impl DateProcessor {
166    pub(crate) fn target_count(&self) -> usize {
167        self.fields.len()
168    }
169
170    fn parse(&self, val: &str) -> Result<Timestamp> {
171        let mut tz = Tz::UTC;
172        if let Some(timezone) = &self.timezone {
173            tz = timezone.parse::<Tz>().context(DateParseTimezoneSnafu {
174                value: timezone.as_ref(),
175            })?;
176        }
177
178        for fmt in self.formats.iter() {
179            if let Ok(ns) = try_parse(val, fmt, tz) {
180                return Ok(Timestamp::Nanosecond(ns));
181            }
182        }
183
184        ProcessorFailedToParseStringSnafu {
185            kind: PROCESSOR_DATE.to_string(),
186            value: val.to_string(),
187        }
188        .fail()
189    }
190}
191
192impl Processor for DateProcessor {
193    fn kind(&self) -> &str {
194        PROCESSOR_DATE
195    }
196
197    fn ignore_missing(&self) -> bool {
198        self.ignore_missing
199    }
200
201    fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
202        for field in self.fields.iter() {
203            let index = field.input_field();
204            match val.get(index) {
205                Some(Value::String(s)) => {
206                    let timestamp = self.parse(s)?;
207                    let output_key = field.target_or_input_field();
208                    val.insert(output_key.to_string(), Value::Timestamp(timestamp));
209                }
210                Some(Value::Null) | None => {
211                    if !self.ignore_missing {
212                        return ProcessorMissingFieldSnafu {
213                            processor: self.kind().to_string(),
214                            field: field.input_field().to_string(),
215                        }
216                        .fail();
217                    }
218                }
219                Some(v) => {
220                    return ProcessorExpectStringSnafu {
221                        processor: self.kind().to_string(),
222                        v: v.clone(),
223                    }
224                    .fail();
225                }
226            }
227        }
228        Ok(val)
229    }
230}
231
232/// try to parse val with timezone first, if failed, parse without timezone
233fn try_parse(val: &str, fmt: &str, tz: Tz) -> Result<i64> {
234    if let Ok(dt) = DateTime::parse_from_str(val, fmt) {
235        Ok(dt
236            .timestamp_nanos_opt()
237            .context(DateFailedToGetTimestampSnafu)?)
238    } else {
239        let dt = NaiveDateTime::parse_from_str(val, fmt)
240            .context(DateParseSnafu { value: val })?
241            .and_local_timezone(tz)
242            .single()
243            .context(DateFailedToGetLocalTimezoneSnafu)?;
244        Ok(dt
245            .timestamp_nanos_opt()
246            .context(DateFailedToGetTimestampSnafu)?)
247    }
248}
249
250#[cfg(test)]
251mod tests {
252    use std::sync::Arc;
253
254    use chrono_tz::Asia::Tokyo;
255
256    use crate::etl::processor::date::{try_parse, DateProcessor};
257
258    #[test]
259    fn test_try_parse() {
260        let time_with_tz = "2014-5-17T04:34:56+00:00";
261        let fmt_with_tz = "%Y-%m-%dT%H:%M:%S%:z";
262
263        let time_without_tz = "2014-5-17T13:34:56";
264        let fmt_without_tz = "%Y-%m-%dT%H:%M:%S";
265
266        let tz = Tokyo;
267
268        let parsed_with_tz = try_parse(time_with_tz, fmt_with_tz, tz);
269        assert!(parsed_with_tz.is_ok());
270
271        let parsed_without_tz = try_parse(time_without_tz, fmt_without_tz, tz);
272        assert!(parsed_without_tz.is_ok());
273
274        assert_eq!(parsed_with_tz.unwrap(), parsed_without_tz.unwrap());
275    }
276
277    #[test]
278    fn test_parse() {
279        let processor = DateProcessor::default();
280
281        let values: Vec<&str> = vec![
282            "2014-5-17T12:34:56",
283            "2014-5-17T12:34:56Z",
284            "2014-5-17T12:34:56+09:30",
285            "2014-5-17T12:34:56.000+09:30",
286            "2014-5-17T12:34:56-0930",
287            "2014-5-17T12:34:56.000-0930",
288        ]
289        .into_iter()
290        .collect();
291
292        for value in values {
293            let parsed = processor.parse(value);
294            assert!(parsed.is_ok());
295        }
296    }
297
298    #[test]
299    fn test_parse_with_formats() {
300        let formats = vec![
301            "%Y-%m-%dT%H:%M:%S%:z",
302            "%Y-%m-%dT%H:%M:%S%.3f%:z",
303            "%Y-%m-%dT%H:%M:%S",
304            "%Y-%m-%dT%H:%M:%SZ",
305        ]
306        .into_iter()
307        .map(|s| Arc::new(s.to_string()))
308        .collect::<Vec<_>>();
309        let processor = DateProcessor {
310            formats: super::Formats(formats),
311            ..Default::default()
312        };
313
314        let values: Vec<&str> = vec![
315            "2014-5-17T12:34:56",
316            "2014-5-17T12:34:56Z",
317            "2014-5-17T12:34:56+09:30",
318            "2014-5-17T12:34:56.000+09:30",
319            "2014-5-17T12:34:56-0930",
320            "2014-5-17T12:34:56.000-0930",
321        ]
322        .into_iter()
323        .collect();
324
325        for value in values {
326            let parsed = processor.parse(value);
327            assert!(parsed.is_ok());
328        }
329    }
330
331    #[test]
332    fn test_parse_with_timezone() {
333        let processor = DateProcessor {
334            timezone: Some(Arc::new("Asia/Tokyo".to_string())),
335            ..Default::default()
336        };
337
338        let values: Vec<&str> = vec![
339            "2014-5-17T12:34:56",
340            "2014-5-17T12:34:56Z",
341            "2014-5-17T12:34:56+09:30",
342            "2014-5-17T12:34:56.000+09:30",
343            "2014-5-17T12:34:56-0930",
344            "2014-5-17T12:34:56.000-0930",
345        ]
346        .into_iter()
347        .collect();
348
349        for value in values {
350            let parsed = processor.parse(value);
351            assert!(parsed.is_ok());
352        }
353    }
354}