1use common_time::timestamp::TimeUnit;
16use snafu::{OptionExt, ResultExt};
17
18use crate::error::{
19 EpochInvalidResolutionSnafu, Error, FailedToParseIntSnafu, KeyMustBeStringSnafu,
20 ProcessorMissingFieldSnafu, ProcessorUnsupportedValueSnafu, Result,
21};
22use crate::etl::field::Fields;
23use crate::etl::processor::{
24 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
25 IGNORE_MISSING_NAME,
26};
27use crate::etl::value::time::{
28 MICROSECOND_RESOLUTION, MICRO_RESOLUTION, MILLISECOND_RESOLUTION, MILLI_RESOLUTION,
29 MS_RESOLUTION, NANOSECOND_RESOLUTION, NANO_RESOLUTION, NS_RESOLUTION, SECOND_RESOLUTION,
30 SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION,
31};
32use crate::etl::value::{Timestamp, Value};
33
34pub(crate) const PROCESSOR_EPOCH: &str = "epoch";
35const RESOLUTION_NAME: &str = "resolution";
36
37#[derive(Debug, Default)]
38pub(crate) enum Resolution {
39 Second,
40 #[default]
41 Milli,
42 Micro,
43 Nano,
44}
45
46impl TryFrom<&str> for Resolution {
47 type Error = Error;
48
49 fn try_from(s: &str) -> Result<Self> {
50 match s {
51 SECOND_RESOLUTION | SEC_RESOLUTION | S_RESOLUTION => Ok(Resolution::Second),
52 MILLISECOND_RESOLUTION | MILLI_RESOLUTION | MS_RESOLUTION => Ok(Resolution::Milli),
53 MICROSECOND_RESOLUTION | MICRO_RESOLUTION | US_RESOLUTION => Ok(Resolution::Micro),
54 NANOSECOND_RESOLUTION | NANO_RESOLUTION | NS_RESOLUTION => Ok(Resolution::Nano),
55 _ => EpochInvalidResolutionSnafu { resolution: s }.fail(),
56 }
57 }
58}
59
60impl From<&Resolution> for TimeUnit {
61 fn from(resolution: &Resolution) -> Self {
62 match resolution {
63 Resolution::Second => TimeUnit::Second,
64 Resolution::Milli => TimeUnit::Millisecond,
65 Resolution::Micro => TimeUnit::Microsecond,
66 Resolution::Nano => TimeUnit::Nanosecond,
67 }
68 }
69}
70
71#[derive(Debug, Default)]
75pub struct EpochProcessor {
76 pub(crate) fields: Fields,
77 pub(crate) resolution: Resolution,
78 ignore_missing: bool,
79 }
85
86impl EpochProcessor {
87 fn parse(&self, val: &Value) -> Result<Timestamp> {
88 let t: i64 = match val {
89 Value::String(s) => s
90 .parse::<i64>()
91 .context(FailedToParseIntSnafu { value: s })?,
92 Value::Int16(i) => *i as i64,
93 Value::Int32(i) => *i as i64,
94 Value::Int64(i) => *i,
95 Value::Uint8(i) => *i as i64,
96 Value::Uint16(i) => *i as i64,
97 Value::Uint32(i) => *i as i64,
98 Value::Uint64(i) => *i as i64,
99 Value::Float32(f) => *f as i64,
100 Value::Float64(f) => *f as i64,
101
102 Value::Timestamp(t) => match self.resolution {
103 Resolution::Second => t.timestamp(),
104 Resolution::Milli => t.timestamp_millis(),
105 Resolution::Micro => t.timestamp_micros(),
106 Resolution::Nano => t.timestamp_nanos(),
107 },
108
109 _ => {
110 return ProcessorUnsupportedValueSnafu {
111 processor: PROCESSOR_EPOCH,
112 val: val.to_string(),
113 }
114 .fail();
115 }
116 };
117
118 match self.resolution {
119 Resolution::Second => Ok(Timestamp::Second(t)),
120 Resolution::Milli => Ok(Timestamp::Millisecond(t)),
121 Resolution::Micro => Ok(Timestamp::Microsecond(t)),
122 Resolution::Nano => Ok(Timestamp::Nanosecond(t)),
123 }
124 }
125}
126
127impl TryFrom<&yaml_rust::yaml::Hash> for EpochProcessor {
128 type Error = Error;
129
130 fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
131 let mut fields = Fields::default();
132 let mut resolution = Resolution::default();
133 let mut ignore_missing = false;
134
135 for (k, v) in hash {
136 let key = k
137 .as_str()
138 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
139
140 match key {
141 FIELD_NAME => {
142 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
143 }
144 FIELDS_NAME => {
145 fields = yaml_new_fields(v, FIELDS_NAME)?;
146 }
147 RESOLUTION_NAME => {
148 let s = yaml_string(v, RESOLUTION_NAME)?.as_str().try_into()?;
149 resolution = s;
150 }
151 IGNORE_MISSING_NAME => {
152 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
153 }
154
155 _ => {}
156 }
157 }
158 let builder = EpochProcessor {
159 fields,
160 resolution,
161 ignore_missing,
162 };
163
164 Ok(builder)
165 }
166}
167
168impl Processor for EpochProcessor {
169 fn kind(&self) -> &str {
170 PROCESSOR_EPOCH
171 }
172
173 fn ignore_missing(&self) -> bool {
174 self.ignore_missing
175 }
176
177 fn exec_mut(&self, mut val: Value) -> Result<Value> {
178 for field in self.fields.iter() {
179 let index = field.input_field();
180 match val.get(index) {
181 Some(Value::Null) | None => {
182 if !self.ignore_missing {
183 return ProcessorMissingFieldSnafu {
184 processor: self.kind(),
185 field: field.input_field(),
186 }
187 .fail();
188 }
189 }
190 Some(v) => {
191 let timestamp = self.parse(v)?;
192 let output_index = field.target_or_input_field();
193 val.insert(output_index.to_string(), Value::Timestamp(timestamp))?;
194 }
195 }
196 }
197 Ok(val)
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use super::EpochProcessor;
204 use crate::etl::value::Value;
205
206 #[test]
207 fn test_parse_epoch() {
208 let processor = EpochProcessor {
209 resolution: super::Resolution::Second,
210 ..Default::default()
211 };
212
213 let values = [
214 Value::String("1573840000".into()),
215 Value::Int32(1573840000),
216 Value::Uint64(1573840000),
217 Value::Float32(1573840000.0),
218 ];
219
220 for value in values {
221 let parsed = processor.parse(&value).unwrap();
222 assert_eq!(parsed, super::Timestamp::Second(1573840000));
223 }
224 }
225}