pipeline/etl/processor/
epoch.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use chrono::{DateTime, Utc};
16use common_time::timestamp::TimeUnit;
17use snafu::{OptionExt, ResultExt};
18use vrl::value::{KeyString, Value as VrlValue};
19
20use crate::error::{
21    EpochInvalidResolutionSnafu, Error, FailedToParseIntSnafu, InvalidEpochForResolutionSnafu,
22    KeyMustBeStringSnafu, ProcessorMissingFieldSnafu, ProcessorUnsupportedValueSnafu, Result,
23    ValueMustBeMapSnafu,
24};
25use crate::etl::field::Fields;
26use crate::etl::processor::{
27    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
28    IGNORE_MISSING_NAME,
29};
30use crate::etl::value::{
31    MICROSECOND_RESOLUTION, MICRO_RESOLUTION, MILLISECOND_RESOLUTION, MILLI_RESOLUTION,
32    MS_RESOLUTION, NANOSECOND_RESOLUTION, NANO_RESOLUTION, NS_RESOLUTION, SECOND_RESOLUTION,
33    SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION,
34};
35
36pub(crate) const PROCESSOR_EPOCH: &str = "epoch";
37const RESOLUTION_NAME: &str = "resolution";
38
39#[derive(Debug, Default)]
40pub(crate) enum Resolution {
41    Second,
42    #[default]
43    Milli,
44    Micro,
45    Nano,
46}
47
48impl std::fmt::Display for Resolution {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        let text = match self {
51            Resolution::Second => SECOND_RESOLUTION,
52            Resolution::Milli => MILLISECOND_RESOLUTION,
53            Resolution::Micro => MICROSECOND_RESOLUTION,
54            Resolution::Nano => NANOSECOND_RESOLUTION,
55        };
56        write!(f, "{}", text)
57    }
58}
59
60impl TryFrom<&str> for Resolution {
61    type Error = Error;
62
63    fn try_from(s: &str) -> Result<Self> {
64        match s {
65            SECOND_RESOLUTION | SEC_RESOLUTION | S_RESOLUTION => Ok(Resolution::Second),
66            MILLISECOND_RESOLUTION | MILLI_RESOLUTION | MS_RESOLUTION => Ok(Resolution::Milli),
67            MICROSECOND_RESOLUTION | MICRO_RESOLUTION | US_RESOLUTION => Ok(Resolution::Micro),
68            NANOSECOND_RESOLUTION | NANO_RESOLUTION | NS_RESOLUTION => Ok(Resolution::Nano),
69            _ => EpochInvalidResolutionSnafu { resolution: s }.fail(),
70        }
71    }
72}
73
74impl From<&Resolution> for TimeUnit {
75    fn from(resolution: &Resolution) -> Self {
76        match resolution {
77            Resolution::Second => TimeUnit::Second,
78            Resolution::Milli => TimeUnit::Millisecond,
79            Resolution::Micro => TimeUnit::Microsecond,
80            Resolution::Nano => TimeUnit::Nanosecond,
81        }
82    }
83}
84
85/// support string, integer, float, time, epoch
86/// deprecated it should be removed in the future
87/// Reserved for compatibility only
88#[derive(Debug, Default)]
89pub struct EpochProcessor {
90    pub(crate) fields: Fields,
91    pub(crate) resolution: Resolution,
92    ignore_missing: bool,
93    // description
94    // if
95    // ignore_failure
96    // on_failure
97    // tag
98}
99
100impl EpochProcessor {
101    fn parse(&self, val: &VrlValue) -> Result<DateTime<Utc>> {
102        let t: i64 =
103            match val {
104                VrlValue::Bytes(bytes) => String::from_utf8_lossy(bytes).parse::<i64>().context(
105                    FailedToParseIntSnafu {
106                        value: val.to_string_lossy(),
107                    },
108                )?,
109                VrlValue::Integer(ts) => *ts,
110                VrlValue::Float(not_nan) => not_nan.into_inner() as i64,
111                VrlValue::Timestamp(date_time) => return Ok(*date_time),
112                _ => {
113                    return ProcessorUnsupportedValueSnafu {
114                        processor: PROCESSOR_EPOCH,
115                        val: val.to_string(),
116                    }
117                    .fail();
118                }
119            };
120
121        match self.resolution {
122            Resolution::Second => DateTime::from_timestamp(t, 0),
123            Resolution::Milli => DateTime::from_timestamp_millis(t),
124            Resolution::Micro => DateTime::from_timestamp_micros(t),
125            Resolution::Nano => Some(DateTime::from_timestamp_nanos(t)),
126        }
127        .context(InvalidEpochForResolutionSnafu {
128            value: t,
129            resolution: self.resolution.to_string(),
130        })
131    }
132}
133
134impl TryFrom<&yaml_rust::yaml::Hash> for EpochProcessor {
135    type Error = Error;
136
137    fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
138        let mut fields = Fields::default();
139        let mut resolution = Resolution::default();
140        let mut ignore_missing = false;
141
142        for (k, v) in hash {
143            let key = k
144                .as_str()
145                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
146
147            match key {
148                FIELD_NAME => {
149                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
150                }
151                FIELDS_NAME => {
152                    fields = yaml_new_fields(v, FIELDS_NAME)?;
153                }
154                RESOLUTION_NAME => {
155                    let s = yaml_string(v, RESOLUTION_NAME)?.as_str().try_into()?;
156                    resolution = s;
157                }
158                IGNORE_MISSING_NAME => {
159                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
160                }
161
162                _ => {}
163            }
164        }
165        let builder = EpochProcessor {
166            fields,
167            resolution,
168            ignore_missing,
169        };
170
171        Ok(builder)
172    }
173}
174
175impl Processor for EpochProcessor {
176    fn kind(&self) -> &str {
177        PROCESSOR_EPOCH
178    }
179
180    fn ignore_missing(&self) -> bool {
181        self.ignore_missing
182    }
183
184    fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
185        for field in self.fields.iter() {
186            let index = field.input_field();
187            let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
188            match val.get(index) {
189                Some(VrlValue::Null) | None => {
190                    if !self.ignore_missing {
191                        return ProcessorMissingFieldSnafu {
192                            processor: self.kind(),
193                            field: field.input_field(),
194                        }
195                        .fail();
196                    }
197                }
198                Some(v) => {
199                    let timestamp = self.parse(v)?;
200                    let output_index = field.target_or_input_field();
201                    val.insert(
202                        KeyString::from(output_index.to_string()),
203                        VrlValue::Timestamp(timestamp),
204                    );
205                }
206            }
207        }
208        Ok(val)
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use chrono::DateTime;
215    use ordered_float::NotNan;
216    use vrl::prelude::Bytes;
217    use vrl::value::Value as VrlValue;
218
219    use super::EpochProcessor;
220
221    #[test]
222    fn test_parse_epoch() {
223        let processor = EpochProcessor {
224            resolution: super::Resolution::Second,
225            ..Default::default()
226        };
227
228        let values = [
229            VrlValue::Bytes(Bytes::from("1573840000")),
230            VrlValue::Integer(1573840000),
231            VrlValue::Integer(1573840000),
232            VrlValue::Float(NotNan::new(1573840000.0).unwrap()),
233        ];
234
235        for value in values {
236            let parsed = processor.parse(&value).unwrap();
237            assert_eq!(parsed, DateTime::from_timestamp(1573840000, 0).unwrap());
238        }
239    }
240}