pipeline/etl/processor/
epoch.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_time::timestamp::TimeUnit;
16use snafu::{OptionExt, ResultExt};
17
18use crate::error::{
19    EpochInvalidResolutionSnafu, Error, FailedToParseIntSnafu, KeyMustBeStringSnafu,
20    ProcessorMissingFieldSnafu, ProcessorUnsupportedValueSnafu, Result,
21};
22use crate::etl::field::Fields;
23use crate::etl::processor::{
24    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
25    IGNORE_MISSING_NAME,
26};
27use crate::etl::value::time::{
28    MICROSECOND_RESOLUTION, MICRO_RESOLUTION, MILLISECOND_RESOLUTION, MILLI_RESOLUTION,
29    MS_RESOLUTION, NANOSECOND_RESOLUTION, NANO_RESOLUTION, NS_RESOLUTION, SECOND_RESOLUTION,
30    SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION,
31};
32use crate::etl::value::{Timestamp, Value};
33
34pub(crate) const PROCESSOR_EPOCH: &str = "epoch";
35const RESOLUTION_NAME: &str = "resolution";
36
37#[derive(Debug, Default)]
38pub(crate) enum Resolution {
39    Second,
40    #[default]
41    Milli,
42    Micro,
43    Nano,
44}
45
46impl TryFrom<&str> for Resolution {
47    type Error = Error;
48
49    fn try_from(s: &str) -> Result<Self> {
50        match s {
51            SECOND_RESOLUTION | SEC_RESOLUTION | S_RESOLUTION => Ok(Resolution::Second),
52            MILLISECOND_RESOLUTION | MILLI_RESOLUTION | MS_RESOLUTION => Ok(Resolution::Milli),
53            MICROSECOND_RESOLUTION | MICRO_RESOLUTION | US_RESOLUTION => Ok(Resolution::Micro),
54            NANOSECOND_RESOLUTION | NANO_RESOLUTION | NS_RESOLUTION => Ok(Resolution::Nano),
55            _ => EpochInvalidResolutionSnafu { resolution: s }.fail(),
56        }
57    }
58}
59
60impl From<&Resolution> for TimeUnit {
61    fn from(resolution: &Resolution) -> Self {
62        match resolution {
63            Resolution::Second => TimeUnit::Second,
64            Resolution::Milli => TimeUnit::Millisecond,
65            Resolution::Micro => TimeUnit::Microsecond,
66            Resolution::Nano => TimeUnit::Nanosecond,
67        }
68    }
69}
70
71/// support string, integer, float, time, epoch
72/// deprecated it should be removed in the future
73/// Reserved for compatibility only
74#[derive(Debug, Default)]
75pub struct EpochProcessor {
76    pub(crate) fields: Fields,
77    pub(crate) resolution: Resolution,
78    ignore_missing: bool,
79    // description
80    // if
81    // ignore_failure
82    // on_failure
83    // tag
84}
85
86impl EpochProcessor {
87    fn parse(&self, val: &Value) -> Result<Timestamp> {
88        let t: i64 = match val {
89            Value::String(s) => s
90                .parse::<i64>()
91                .context(FailedToParseIntSnafu { value: s })?,
92            Value::Int16(i) => *i as i64,
93            Value::Int32(i) => *i as i64,
94            Value::Int64(i) => *i,
95            Value::Uint8(i) => *i as i64,
96            Value::Uint16(i) => *i as i64,
97            Value::Uint32(i) => *i as i64,
98            Value::Uint64(i) => *i as i64,
99            Value::Float32(f) => *f as i64,
100            Value::Float64(f) => *f as i64,
101
102            Value::Timestamp(t) => match self.resolution {
103                Resolution::Second => t.timestamp(),
104                Resolution::Milli => t.timestamp_millis(),
105                Resolution::Micro => t.timestamp_micros(),
106                Resolution::Nano => t.timestamp_nanos(),
107            },
108
109            _ => {
110                return ProcessorUnsupportedValueSnafu {
111                    processor: PROCESSOR_EPOCH,
112                    val: val.to_string(),
113                }
114                .fail();
115            }
116        };
117
118        match self.resolution {
119            Resolution::Second => Ok(Timestamp::Second(t)),
120            Resolution::Milli => Ok(Timestamp::Millisecond(t)),
121            Resolution::Micro => Ok(Timestamp::Microsecond(t)),
122            Resolution::Nano => Ok(Timestamp::Nanosecond(t)),
123        }
124    }
125}
126
127impl TryFrom<&yaml_rust::yaml::Hash> for EpochProcessor {
128    type Error = Error;
129
130    fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
131        let mut fields = Fields::default();
132        let mut resolution = Resolution::default();
133        let mut ignore_missing = false;
134
135        for (k, v) in hash {
136            let key = k
137                .as_str()
138                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
139
140            match key {
141                FIELD_NAME => {
142                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
143                }
144                FIELDS_NAME => {
145                    fields = yaml_new_fields(v, FIELDS_NAME)?;
146                }
147                RESOLUTION_NAME => {
148                    let s = yaml_string(v, RESOLUTION_NAME)?.as_str().try_into()?;
149                    resolution = s;
150                }
151                IGNORE_MISSING_NAME => {
152                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
153                }
154
155                _ => {}
156            }
157        }
158        let builder = EpochProcessor {
159            fields,
160            resolution,
161            ignore_missing,
162        };
163
164        Ok(builder)
165    }
166}
167
168impl Processor for EpochProcessor {
169    fn kind(&self) -> &str {
170        PROCESSOR_EPOCH
171    }
172
173    fn ignore_missing(&self) -> bool {
174        self.ignore_missing
175    }
176
177    fn exec_mut(&self, mut val: Value) -> Result<Value> {
178        for field in self.fields.iter() {
179            let index = field.input_field();
180            match val.get(index) {
181                Some(Value::Null) | None => {
182                    if !self.ignore_missing {
183                        return ProcessorMissingFieldSnafu {
184                            processor: self.kind(),
185                            field: field.input_field(),
186                        }
187                        .fail();
188                    }
189                }
190                Some(v) => {
191                    let timestamp = self.parse(v)?;
192                    let output_index = field.target_or_input_field();
193                    val.insert(output_index.to_string(), Value::Timestamp(timestamp))?;
194                }
195            }
196        }
197        Ok(val)
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use super::EpochProcessor;
204    use crate::etl::value::Value;
205
206    #[test]
207    fn test_parse_epoch() {
208        let processor = EpochProcessor {
209            resolution: super::Resolution::Second,
210            ..Default::default()
211        };
212
213        let values = [
214            Value::String("1573840000".into()),
215            Value::Int32(1573840000),
216            Value::Uint64(1573840000),
217            Value::Float32(1573840000.0),
218        ];
219
220        for value in values {
221            let parsed = processor.parse(&value).unwrap();
222            assert_eq!(parsed, super::Timestamp::Second(1573840000));
223        }
224    }
225}