1use chrono::{DateTime, Utc};
16use common_time::timestamp::TimeUnit;
17use snafu::{OptionExt, ResultExt};
18use vrl::value::{KeyString, Value as VrlValue};
19
20use crate::error::{
21 EpochInvalidResolutionSnafu, Error, FailedToParseIntSnafu, InvalidEpochForResolutionSnafu,
22 KeyMustBeStringSnafu, ProcessorMissingFieldSnafu, ProcessorUnsupportedValueSnafu, Result,
23 ValueMustBeMapSnafu,
24};
25use crate::etl::field::Fields;
26use crate::etl::processor::{
27 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
28 IGNORE_MISSING_NAME,
29};
30use crate::etl::value::{
31 MICROSECOND_RESOLUTION, MICRO_RESOLUTION, MILLISECOND_RESOLUTION, MILLI_RESOLUTION,
32 MS_RESOLUTION, NANOSECOND_RESOLUTION, NANO_RESOLUTION, NS_RESOLUTION, SECOND_RESOLUTION,
33 SEC_RESOLUTION, S_RESOLUTION, US_RESOLUTION,
34};
35
36pub(crate) const PROCESSOR_EPOCH: &str = "epoch";
37const RESOLUTION_NAME: &str = "resolution";
38
39#[derive(Debug, Default)]
40pub(crate) enum Resolution {
41 Second,
42 #[default]
43 Milli,
44 Micro,
45 Nano,
46}
47
48impl std::fmt::Display for Resolution {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 let text = match self {
51 Resolution::Second => SECOND_RESOLUTION,
52 Resolution::Milli => MILLISECOND_RESOLUTION,
53 Resolution::Micro => MICROSECOND_RESOLUTION,
54 Resolution::Nano => NANOSECOND_RESOLUTION,
55 };
56 write!(f, "{}", text)
57 }
58}
59
60impl TryFrom<&str> for Resolution {
61 type Error = Error;
62
63 fn try_from(s: &str) -> Result<Self> {
64 match s {
65 SECOND_RESOLUTION | SEC_RESOLUTION | S_RESOLUTION => Ok(Resolution::Second),
66 MILLISECOND_RESOLUTION | MILLI_RESOLUTION | MS_RESOLUTION => Ok(Resolution::Milli),
67 MICROSECOND_RESOLUTION | MICRO_RESOLUTION | US_RESOLUTION => Ok(Resolution::Micro),
68 NANOSECOND_RESOLUTION | NANO_RESOLUTION | NS_RESOLUTION => Ok(Resolution::Nano),
69 _ => EpochInvalidResolutionSnafu { resolution: s }.fail(),
70 }
71 }
72}
73
74impl From<&Resolution> for TimeUnit {
75 fn from(resolution: &Resolution) -> Self {
76 match resolution {
77 Resolution::Second => TimeUnit::Second,
78 Resolution::Milli => TimeUnit::Millisecond,
79 Resolution::Micro => TimeUnit::Microsecond,
80 Resolution::Nano => TimeUnit::Nanosecond,
81 }
82 }
83}
84
85#[derive(Debug, Default)]
89pub struct EpochProcessor {
90 pub(crate) fields: Fields,
91 pub(crate) resolution: Resolution,
92 ignore_missing: bool,
93 }
99
100impl EpochProcessor {
101 fn parse(&self, val: &VrlValue) -> Result<DateTime<Utc>> {
102 let t: i64 =
103 match val {
104 VrlValue::Bytes(bytes) => String::from_utf8_lossy(bytes).parse::<i64>().context(
105 FailedToParseIntSnafu {
106 value: val.to_string_lossy(),
107 },
108 )?,
109 VrlValue::Integer(ts) => *ts,
110 VrlValue::Float(not_nan) => not_nan.into_inner() as i64,
111 VrlValue::Timestamp(date_time) => return Ok(*date_time),
112 _ => {
113 return ProcessorUnsupportedValueSnafu {
114 processor: PROCESSOR_EPOCH,
115 val: val.to_string(),
116 }
117 .fail();
118 }
119 };
120
121 match self.resolution {
122 Resolution::Second => DateTime::from_timestamp(t, 0),
123 Resolution::Milli => DateTime::from_timestamp_millis(t),
124 Resolution::Micro => DateTime::from_timestamp_micros(t),
125 Resolution::Nano => Some(DateTime::from_timestamp_nanos(t)),
126 }
127 .context(InvalidEpochForResolutionSnafu {
128 value: t,
129 resolution: self.resolution.to_string(),
130 })
131 }
132}
133
134impl TryFrom<&yaml_rust::yaml::Hash> for EpochProcessor {
135 type Error = Error;
136
137 fn try_from(hash: &yaml_rust::yaml::Hash) -> Result<Self> {
138 let mut fields = Fields::default();
139 let mut resolution = Resolution::default();
140 let mut ignore_missing = false;
141
142 for (k, v) in hash {
143 let key = k
144 .as_str()
145 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
146
147 match key {
148 FIELD_NAME => {
149 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
150 }
151 FIELDS_NAME => {
152 fields = yaml_new_fields(v, FIELDS_NAME)?;
153 }
154 RESOLUTION_NAME => {
155 let s = yaml_string(v, RESOLUTION_NAME)?.as_str().try_into()?;
156 resolution = s;
157 }
158 IGNORE_MISSING_NAME => {
159 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
160 }
161
162 _ => {}
163 }
164 }
165 let builder = EpochProcessor {
166 fields,
167 resolution,
168 ignore_missing,
169 };
170
171 Ok(builder)
172 }
173}
174
175impl Processor for EpochProcessor {
176 fn kind(&self) -> &str {
177 PROCESSOR_EPOCH
178 }
179
180 fn ignore_missing(&self) -> bool {
181 self.ignore_missing
182 }
183
184 fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
185 for field in self.fields.iter() {
186 let index = field.input_field();
187 let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
188 match val.get(index) {
189 Some(VrlValue::Null) | None => {
190 if !self.ignore_missing {
191 return ProcessorMissingFieldSnafu {
192 processor: self.kind(),
193 field: field.input_field(),
194 }
195 .fail();
196 }
197 }
198 Some(v) => {
199 let timestamp = self.parse(v)?;
200 let output_index = field.target_or_input_field();
201 val.insert(
202 KeyString::from(output_index.to_string()),
203 VrlValue::Timestamp(timestamp),
204 );
205 }
206 }
207 }
208 Ok(val)
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use chrono::DateTime;
215 use ordered_float::NotNan;
216 use vrl::prelude::Bytes;
217 use vrl::value::Value as VrlValue;
218
219 use super::EpochProcessor;
220
221 #[test]
222 fn test_parse_epoch() {
223 let processor = EpochProcessor {
224 resolution: super::Resolution::Second,
225 ..Default::default()
226 };
227
228 let values = [
229 VrlValue::Bytes(Bytes::from("1573840000")),
230 VrlValue::Integer(1573840000),
231 VrlValue::Integer(1573840000),
232 VrlValue::Float(NotNan::new(1573840000.0).unwrap()),
233 ];
234
235 for value in values {
236 let parsed = processor.parse(&value).unwrap();
237 assert_eq!(parsed, DateTime::from_timestamp(1573840000, 0).unwrap());
238 }
239 }
240}