pipeline/etl/processor/
epoch.rs1use snafu::{OptionExt, ResultExt};
16
17use crate::error::{
18 EpochInvalidResolutionSnafu, Error, FailedToParseIntSnafu, KeyMustBeStringSnafu,
19 ProcessorMissingFieldSnafu, ProcessorUnsupportedValueSnafu, Result,
20};
21use crate::etl::field::Fields;
22use crate::etl::processor::{
23 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
24 IGNORE_MISSING_NAME,
25};
26use crate::etl::value::time::{
27 MICROSECOND_RESOLUTION, MICRO_RESOLUTION, MILLISECOND_RESOLUTION, MILLI_RESOLUTION,
28 MS_RESOLUTION, NANOSECOND_RESOLUTION, NANO_RESOLUTION, NS_RESOLUTION, SECOND_RESOLUTION,
29 SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION,
30};
31use crate::etl::value::{Timestamp, Value};
32use crate::etl::PipelineMap;
33
34pub(crate) const PROCESSOR_EPOCH: &str = "epoch";
35const RESOLUTION_NAME: &str = "resolution";
36
37#[derive(Debug, Default)]
38enum Resolution {
39 Second,
40 #[default]
41 Milli,
42 Micro,
43 Nano,
44}
45
46impl TryFrom<&str> for Resolution {
47 type Error = Error;
48
49 fn try_from(s: &str) -> Result<Self> {
50 match s {
51 SECOND_RESOLUTION | SEC_RESOLUTION | S_RESOLUTION => Ok(Resolution::Second),
52 MILLISECOND_RESOLUTION | MILLI_RESOLUTION | MS_RESOLUTION => Ok(Resolution::Milli),
53 MICROSECOND_RESOLUTION | MICRO_RESOLUTION | US_RESOLUTION => Ok(Resolution::Micro),
54 NANOSECOND_RESOLUTION | NANO_RESOLUTION | NS_RESOLUTION => Ok(Resolution::Nano),
55 _ => EpochInvalidResolutionSnafu { resolution: s }.fail(),
56 }
57 }
58}
59
60#[derive(Debug, Default)]
64pub struct EpochProcessor {
65 fields: Fields,
66 resolution: Resolution,
67 ignore_missing: bool,
68 }
74
75impl EpochProcessor {
76 fn parse(&self, val: &Value) -> Result<Timestamp> {
77 let t: i64 = match val {
78 Value::String(s) => s
79 .parse::<i64>()
80 .context(FailedToParseIntSnafu { value: s })?,
81 Value::Int16(i) => *i as i64,
82 Value::Int32(i) => *i as i64,
83 Value::Int64(i) => *i,
84 Value::Uint8(i) => *i as i64,
85 Value::Uint16(i) => *i as i64,
86 Value::Uint32(i) => *i as i64,
87 Value::Uint64(i) => *i as i64,
88 Value::Float32(f) => *f as i64,
89 Value::Float64(f) => *f as i64,
90
91 Value::Timestamp(t) => match self.resolution {
92 Resolution::Second => t.timestamp(),
93 Resolution::Milli => t.timestamp_millis(),
94 Resolution::Micro => t.timestamp_micros(),
95 Resolution::Nano => t.timestamp_nanos(),
96 },
97
98 _ => {
99 return ProcessorUnsupportedValueSnafu {
100 processor: PROCESSOR_EPOCH,
101 val: val.to_string(),
102 }
103 .fail();
104 }
105 };
106
107 match self.resolution {
108 Resolution::Second => Ok(Timestamp::Second(t)),
109 Resolution::Milli => Ok(Timestamp::Millisecond(t)),
110 Resolution::Micro => Ok(Timestamp::Microsecond(t)),
111 Resolution::Nano => Ok(Timestamp::Nanosecond(t)),
112 }
113 }
114
115 pub(crate) fn target_count(&self) -> usize {
116 self.fields.len()
117 }
118}
119
120impl TryFrom<&yaml_rust::yaml::Hash> for EpochProcessor {
121 type Error = Error;
122
123 fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
124 let mut fields = Fields::default();
125 let mut resolution = Resolution::default();
126 let mut ignore_missing = false;
127
128 for (k, v) in hash {
129 let key = k
130 .as_str()
131 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
132
133 match key {
134 FIELD_NAME => {
135 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
136 }
137 FIELDS_NAME => {
138 fields = yaml_new_fields(v, FIELDS_NAME)?;
139 }
140 RESOLUTION_NAME => {
141 let s = yaml_string(v, RESOLUTION_NAME)?.as_str().try_into()?;
142 resolution = s;
143 }
144 IGNORE_MISSING_NAME => {
145 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
146 }
147
148 _ => {}
149 }
150 }
151 let builder = EpochProcessor {
152 fields,
153 resolution,
154 ignore_missing,
155 };
156
157 Ok(builder)
158 }
159}
160
161impl Processor for EpochProcessor {
162 fn kind(&self) -> &str {
163 PROCESSOR_EPOCH
164 }
165
166 fn ignore_missing(&self) -> bool {
167 self.ignore_missing
168 }
169
170 fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
171 for field in self.fields.iter() {
172 let index = field.input_field();
173 match val.get(index) {
174 Some(Value::Null) | None => {
175 if !self.ignore_missing {
176 return ProcessorMissingFieldSnafu {
177 processor: self.kind(),
178 field: field.input_field(),
179 }
180 .fail();
181 }
182 }
183 Some(v) => {
184 let timestamp = self.parse(v)?;
185 let output_index = field.target_or_input_field();
186 val.insert(output_index.to_string(), Value::Timestamp(timestamp));
187 }
188 }
189 }
190 Ok(val)
191 }
192}
193
194#[cfg(test)]
195mod tests {
196 use super::EpochProcessor;
197 use crate::etl::value::Value;
198
199 #[test]
200 fn test_parse_epoch() {
201 let processor = EpochProcessor {
202 resolution: super::Resolution::Second,
203 ..Default::default()
204 };
205
206 let values = [
207 Value::String("1573840000".into()),
208 Value::Int32(1573840000),
209 Value::Uint64(1573840000),
210 Value::Float32(1573840000.0),
211 ];
212
213 for value in values {
214 let parsed = processor.parse(&value).unwrap();
215 assert_eq!(parsed, super::Timestamp::Second(1573840000));
216 }
217 }
218}