pipeline/etl/processor/
epoch.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use snafu::{OptionExt, ResultExt};
16
17use crate::error::{
18    EpochInvalidResolutionSnafu, Error, FailedToParseIntSnafu, KeyMustBeStringSnafu,
19    ProcessorMissingFieldSnafu, ProcessorUnsupportedValueSnafu, Result,
20};
21use crate::etl::field::Fields;
22use crate::etl::processor::{
23    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
24    IGNORE_MISSING_NAME,
25};
26use crate::etl::value::time::{
27    MICROSECOND_RESOLUTION, MICRO_RESOLUTION, MILLISECOND_RESOLUTION, MILLI_RESOLUTION,
28    MS_RESOLUTION, NANOSECOND_RESOLUTION, NANO_RESOLUTION, NS_RESOLUTION, SECOND_RESOLUTION,
29    SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION,
30};
31use crate::etl::value::{Timestamp, Value};
32use crate::etl::PipelineMap;
33
34pub(crate) const PROCESSOR_EPOCH: &str = "epoch";
35const RESOLUTION_NAME: &str = "resolution";
36
37#[derive(Debug, Default)]
38enum Resolution {
39    Second,
40    #[default]
41    Milli,
42    Micro,
43    Nano,
44}
45
46impl TryFrom<&str> for Resolution {
47    type Error = Error;
48
49    fn try_from(s: &str) -> Result<Self> {
50        match s {
51            SECOND_RESOLUTION | SEC_RESOLUTION | S_RESOLUTION => Ok(Resolution::Second),
52            MILLISECOND_RESOLUTION | MILLI_RESOLUTION | MS_RESOLUTION => Ok(Resolution::Milli),
53            MICROSECOND_RESOLUTION | MICRO_RESOLUTION | US_RESOLUTION => Ok(Resolution::Micro),
54            NANOSECOND_RESOLUTION | NANO_RESOLUTION | NS_RESOLUTION => Ok(Resolution::Nano),
55            _ => EpochInvalidResolutionSnafu { resolution: s }.fail(),
56        }
57    }
58}
59
60/// support string, integer, float, time, epoch
61/// deprecated it should be removed in the future
62/// Reserved for compatibility only
63#[derive(Debug, Default)]
64pub struct EpochProcessor {
65    fields: Fields,
66    resolution: Resolution,
67    ignore_missing: bool,
68    // description
69    // if
70    // ignore_failure
71    // on_failure
72    // tag
73}
74
75impl EpochProcessor {
76    fn parse(&self, val: &Value) -> Result<Timestamp> {
77        let t: i64 = match val {
78            Value::String(s) => s
79                .parse::<i64>()
80                .context(FailedToParseIntSnafu { value: s })?,
81            Value::Int16(i) => *i as i64,
82            Value::Int32(i) => *i as i64,
83            Value::Int64(i) => *i,
84            Value::Uint8(i) => *i as i64,
85            Value::Uint16(i) => *i as i64,
86            Value::Uint32(i) => *i as i64,
87            Value::Uint64(i) => *i as i64,
88            Value::Float32(f) => *f as i64,
89            Value::Float64(f) => *f as i64,
90
91            Value::Timestamp(t) => match self.resolution {
92                Resolution::Second => t.timestamp(),
93                Resolution::Milli => t.timestamp_millis(),
94                Resolution::Micro => t.timestamp_micros(),
95                Resolution::Nano => t.timestamp_nanos(),
96            },
97
98            _ => {
99                return ProcessorUnsupportedValueSnafu {
100                    processor: PROCESSOR_EPOCH,
101                    val: val.to_string(),
102                }
103                .fail();
104            }
105        };
106
107        match self.resolution {
108            Resolution::Second => Ok(Timestamp::Second(t)),
109            Resolution::Milli => Ok(Timestamp::Millisecond(t)),
110            Resolution::Micro => Ok(Timestamp::Microsecond(t)),
111            Resolution::Nano => Ok(Timestamp::Nanosecond(t)),
112        }
113    }
114}
115
116impl TryFrom<&yaml_rust::yaml::Hash> for EpochProcessor {
117    type Error = Error;
118
119    fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
120        let mut fields = Fields::default();
121        let mut resolution = Resolution::default();
122        let mut ignore_missing = false;
123
124        for (k, v) in hash {
125            let key = k
126                .as_str()
127                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
128
129            match key {
130                FIELD_NAME => {
131                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
132                }
133                FIELDS_NAME => {
134                    fields = yaml_new_fields(v, FIELDS_NAME)?;
135                }
136                RESOLUTION_NAME => {
137                    let s = yaml_string(v, RESOLUTION_NAME)?.as_str().try_into()?;
138                    resolution = s;
139                }
140                IGNORE_MISSING_NAME => {
141                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
142                }
143
144                _ => {}
145            }
146        }
147        let builder = EpochProcessor {
148            fields,
149            resolution,
150            ignore_missing,
151        };
152
153        Ok(builder)
154    }
155}
156
157impl Processor for EpochProcessor {
158    fn kind(&self) -> &str {
159        PROCESSOR_EPOCH
160    }
161
162    fn ignore_missing(&self) -> bool {
163        self.ignore_missing
164    }
165
166    fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
167        for field in self.fields.iter() {
168            let index = field.input_field();
169            match val.get(index) {
170                Some(Value::Null) | None => {
171                    if !self.ignore_missing {
172                        return ProcessorMissingFieldSnafu {
173                            processor: self.kind(),
174                            field: field.input_field(),
175                        }
176                        .fail();
177                    }
178                }
179                Some(v) => {
180                    let timestamp = self.parse(v)?;
181                    let output_index = field.target_or_input_field();
182                    val.insert(output_index.to_string(), Value::Timestamp(timestamp));
183                }
184            }
185        }
186        Ok(())
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use super::EpochProcessor;
193    use crate::etl::value::Value;
194
195    #[test]
196    fn test_parse_epoch() {
197        let processor = EpochProcessor {
198            resolution: super::Resolution::Second,
199            ..Default::default()
200        };
201
202        let values = [
203            Value::String("1573840000".into()),
204            Value::Int32(1573840000),
205            Value::Uint64(1573840000),
206            Value::Float32(1573840000.0),
207        ];
208
209        for value in values {
210            let parsed = processor.parse(&value).unwrap();
211            assert_eq!(parsed, super::Timestamp::Second(1573840000));
212        }
213    }
214}