pipeline/etl/processor/
epoch.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use snafu::{OptionExt, ResultExt};
16
17use crate::error::{
18    EpochInvalidResolutionSnafu, Error, FailedToParseIntSnafu, KeyMustBeStringSnafu,
19    ProcessorMissingFieldSnafu, ProcessorUnsupportedValueSnafu, Result,
20};
21use crate::etl::field::Fields;
22use crate::etl::processor::{
23    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
24    IGNORE_MISSING_NAME,
25};
26use crate::etl::value::time::{
27    MICROSECOND_RESOLUTION, MICRO_RESOLUTION, MILLISECOND_RESOLUTION, MILLI_RESOLUTION,
28    MS_RESOLUTION, NANOSECOND_RESOLUTION, NANO_RESOLUTION, NS_RESOLUTION, SECOND_RESOLUTION,
29    SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION,
30};
31use crate::etl::value::{Timestamp, Value};
32use crate::etl::PipelineMap;
33
34pub(crate) const PROCESSOR_EPOCH: &str = "epoch";
35const RESOLUTION_NAME: &str = "resolution";
36
37#[derive(Debug, Default)]
38enum Resolution {
39    Second,
40    #[default]
41    Milli,
42    Micro,
43    Nano,
44}
45
46impl TryFrom<&str> for Resolution {
47    type Error = Error;
48
49    fn try_from(s: &str) -> Result<Self> {
50        match s {
51            SECOND_RESOLUTION | SEC_RESOLUTION | S_RESOLUTION => Ok(Resolution::Second),
52            MILLISECOND_RESOLUTION | MILLI_RESOLUTION | MS_RESOLUTION => Ok(Resolution::Milli),
53            MICROSECOND_RESOLUTION | MICRO_RESOLUTION | US_RESOLUTION => Ok(Resolution::Micro),
54            NANOSECOND_RESOLUTION | NANO_RESOLUTION | NS_RESOLUTION => Ok(Resolution::Nano),
55            _ => EpochInvalidResolutionSnafu { resolution: s }.fail(),
56        }
57    }
58}
59
60/// support string, integer, float, time, epoch
61/// deprecated it should be removed in the future
62/// Reserved for compatibility only
63#[derive(Debug, Default)]
64pub struct EpochProcessor {
65    fields: Fields,
66    resolution: Resolution,
67    ignore_missing: bool,
68    // description
69    // if
70    // ignore_failure
71    // on_failure
72    // tag
73}
74
75impl EpochProcessor {
76    fn parse(&self, val: &Value) -> Result<Timestamp> {
77        let t: i64 = match val {
78            Value::String(s) => s
79                .parse::<i64>()
80                .context(FailedToParseIntSnafu { value: s })?,
81            Value::Int16(i) => *i as i64,
82            Value::Int32(i) => *i as i64,
83            Value::Int64(i) => *i,
84            Value::Uint8(i) => *i as i64,
85            Value::Uint16(i) => *i as i64,
86            Value::Uint32(i) => *i as i64,
87            Value::Uint64(i) => *i as i64,
88            Value::Float32(f) => *f as i64,
89            Value::Float64(f) => *f as i64,
90
91            Value::Timestamp(t) => match self.resolution {
92                Resolution::Second => t.timestamp(),
93                Resolution::Milli => t.timestamp_millis(),
94                Resolution::Micro => t.timestamp_micros(),
95                Resolution::Nano => t.timestamp_nanos(),
96            },
97
98            _ => {
99                return ProcessorUnsupportedValueSnafu {
100                    processor: PROCESSOR_EPOCH,
101                    val: val.to_string(),
102                }
103                .fail();
104            }
105        };
106
107        match self.resolution {
108            Resolution::Second => Ok(Timestamp::Second(t)),
109            Resolution::Milli => Ok(Timestamp::Millisecond(t)),
110            Resolution::Micro => Ok(Timestamp::Microsecond(t)),
111            Resolution::Nano => Ok(Timestamp::Nanosecond(t)),
112        }
113    }
114
115    pub(crate) fn target_count(&self) -> usize {
116        self.fields.len()
117    }
118}
119
120impl TryFrom<&yaml_rust::yaml::Hash> for EpochProcessor {
121    type Error = Error;
122
123    fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
124        let mut fields = Fields::default();
125        let mut resolution = Resolution::default();
126        let mut ignore_missing = false;
127
128        for (k, v) in hash {
129            let key = k
130                .as_str()
131                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
132
133            match key {
134                FIELD_NAME => {
135                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
136                }
137                FIELDS_NAME => {
138                    fields = yaml_new_fields(v, FIELDS_NAME)?;
139                }
140                RESOLUTION_NAME => {
141                    let s = yaml_string(v, RESOLUTION_NAME)?.as_str().try_into()?;
142                    resolution = s;
143                }
144                IGNORE_MISSING_NAME => {
145                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
146                }
147
148                _ => {}
149            }
150        }
151        let builder = EpochProcessor {
152            fields,
153            resolution,
154            ignore_missing,
155        };
156
157        Ok(builder)
158    }
159}
160
161impl Processor for EpochProcessor {
162    fn kind(&self) -> &str {
163        PROCESSOR_EPOCH
164    }
165
166    fn ignore_missing(&self) -> bool {
167        self.ignore_missing
168    }
169
170    fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
171        for field in self.fields.iter() {
172            let index = field.input_field();
173            match val.get(index) {
174                Some(Value::Null) | None => {
175                    if !self.ignore_missing {
176                        return ProcessorMissingFieldSnafu {
177                            processor: self.kind(),
178                            field: field.input_field(),
179                        }
180                        .fail();
181                    }
182                }
183                Some(v) => {
184                    let timestamp = self.parse(v)?;
185                    let output_index = field.target_or_input_field();
186                    val.insert(output_index.to_string(), Value::Timestamp(timestamp));
187                }
188            }
189        }
190        Ok(val)
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use super::EpochProcessor;
197    use crate::etl::value::Value;
198
199    #[test]
200    fn test_parse_epoch() {
201        let processor = EpochProcessor {
202            resolution: super::Resolution::Second,
203            ..Default::default()
204        };
205
206        let values = [
207            Value::String("1573840000".into()),
208            Value::Int32(1573840000),
209            Value::Uint64(1573840000),
210            Value::Float32(1573840000.0),
211        ];
212
213        for value in values {
214            let parsed = processor.parse(&value).unwrap();
215            assert_eq!(parsed, super::Timestamp::Second(1573840000));
216        }
217    }
218}