pipeline/etl/processor/
epoch.rs1use snafu::{OptionExt, ResultExt};
16
17use crate::error::{
18 EpochInvalidResolutionSnafu, Error, FailedToParseIntSnafu, KeyMustBeStringSnafu,
19 ProcessorMissingFieldSnafu, ProcessorUnsupportedValueSnafu, Result,
20};
21use crate::etl::field::Fields;
22use crate::etl::processor::{
23 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
24 IGNORE_MISSING_NAME,
25};
26use crate::etl::value::time::{
27 MICROSECOND_RESOLUTION, MICRO_RESOLUTION, MILLISECOND_RESOLUTION, MILLI_RESOLUTION,
28 MS_RESOLUTION, NANOSECOND_RESOLUTION, NANO_RESOLUTION, NS_RESOLUTION, SECOND_RESOLUTION,
29 SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION,
30};
31use crate::etl::value::{Timestamp, Value};
32use crate::etl::PipelineMap;
33
34pub(crate) const PROCESSOR_EPOCH: &str = "epoch";
35const RESOLUTION_NAME: &str = "resolution";
36
37#[derive(Debug, Default)]
38enum Resolution {
39 Second,
40 #[default]
41 Milli,
42 Micro,
43 Nano,
44}
45
46impl TryFrom<&str> for Resolution {
47 type Error = Error;
48
49 fn try_from(s: &str) -> Result<Self> {
50 match s {
51 SECOND_RESOLUTION | SEC_RESOLUTION | S_RESOLUTION => Ok(Resolution::Second),
52 MILLISECOND_RESOLUTION | MILLI_RESOLUTION | MS_RESOLUTION => Ok(Resolution::Milli),
53 MICROSECOND_RESOLUTION | MICRO_RESOLUTION | US_RESOLUTION => Ok(Resolution::Micro),
54 NANOSECOND_RESOLUTION | NANO_RESOLUTION | NS_RESOLUTION => Ok(Resolution::Nano),
55 _ => EpochInvalidResolutionSnafu { resolution: s }.fail(),
56 }
57 }
58}
59
60#[derive(Debug, Default)]
64pub struct EpochProcessor {
65 fields: Fields,
66 resolution: Resolution,
67 ignore_missing: bool,
68 }
74
75impl EpochProcessor {
76 fn parse(&self, val: &Value) -> Result<Timestamp> {
77 let t: i64 = match val {
78 Value::String(s) => s
79 .parse::<i64>()
80 .context(FailedToParseIntSnafu { value: s })?,
81 Value::Int16(i) => *i as i64,
82 Value::Int32(i) => *i as i64,
83 Value::Int64(i) => *i,
84 Value::Uint8(i) => *i as i64,
85 Value::Uint16(i) => *i as i64,
86 Value::Uint32(i) => *i as i64,
87 Value::Uint64(i) => *i as i64,
88 Value::Float32(f) => *f as i64,
89 Value::Float64(f) => *f as i64,
90
91 Value::Timestamp(t) => match self.resolution {
92 Resolution::Second => t.timestamp(),
93 Resolution::Milli => t.timestamp_millis(),
94 Resolution::Micro => t.timestamp_micros(),
95 Resolution::Nano => t.timestamp_nanos(),
96 },
97
98 _ => {
99 return ProcessorUnsupportedValueSnafu {
100 processor: PROCESSOR_EPOCH,
101 val: val.to_string(),
102 }
103 .fail();
104 }
105 };
106
107 match self.resolution {
108 Resolution::Second => Ok(Timestamp::Second(t)),
109 Resolution::Milli => Ok(Timestamp::Millisecond(t)),
110 Resolution::Micro => Ok(Timestamp::Microsecond(t)),
111 Resolution::Nano => Ok(Timestamp::Nanosecond(t)),
112 }
113 }
114}
115
116impl TryFrom<&yaml_rust::yaml::Hash> for EpochProcessor {
117 type Error = Error;
118
119 fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
120 let mut fields = Fields::default();
121 let mut resolution = Resolution::default();
122 let mut ignore_missing = false;
123
124 for (k, v) in hash {
125 let key = k
126 .as_str()
127 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
128
129 match key {
130 FIELD_NAME => {
131 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
132 }
133 FIELDS_NAME => {
134 fields = yaml_new_fields(v, FIELDS_NAME)?;
135 }
136 RESOLUTION_NAME => {
137 let s = yaml_string(v, RESOLUTION_NAME)?.as_str().try_into()?;
138 resolution = s;
139 }
140 IGNORE_MISSING_NAME => {
141 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
142 }
143
144 _ => {}
145 }
146 }
147 let builder = EpochProcessor {
148 fields,
149 resolution,
150 ignore_missing,
151 };
152
153 Ok(builder)
154 }
155}
156
157impl Processor for EpochProcessor {
158 fn kind(&self) -> &str {
159 PROCESSOR_EPOCH
160 }
161
162 fn ignore_missing(&self) -> bool {
163 self.ignore_missing
164 }
165
166 fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
167 for field in self.fields.iter() {
168 let index = field.input_field();
169 match val.get(index) {
170 Some(Value::Null) | None => {
171 if !self.ignore_missing {
172 return ProcessorMissingFieldSnafu {
173 processor: self.kind(),
174 field: field.input_field(),
175 }
176 .fail();
177 }
178 }
179 Some(v) => {
180 let timestamp = self.parse(v)?;
181 let output_index = field.target_or_input_field();
182 val.insert(output_index.to_string(), Value::Timestamp(timestamp));
183 }
184 }
185 }
186 Ok(())
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use super::EpochProcessor;
193 use crate::etl::value::Value;
194
195 #[test]
196 fn test_parse_epoch() {
197 let processor = EpochProcessor {
198 resolution: super::Resolution::Second,
199 ..Default::default()
200 };
201
202 let values = [
203 Value::String("1573840000".into()),
204 Value::Int32(1573840000),
205 Value::Uint64(1573840000),
206 Value::Float32(1573840000.0),
207 ];
208
209 for value in values {
210 let parsed = processor.parse(&value).unwrap();
211 assert_eq!(parsed, super::Timestamp::Second(1573840000));
212 }
213 }
214}