pipeline/etl/processor/
vrl.rs1use std::collections::BTreeMap;
16
17use chrono_tz::Tz;
18use snafu::{OptionExt, ResultExt};
19use vrl::compiler::runtime::Runtime;
20use vrl::compiler::{compile, Program, TargetValue};
21use vrl::diagnostic::Formatter;
22use vrl::prelude::{Bytes, NotNan, TimeZone};
23use vrl::value::{KeyString, Kind, Secrets, Value as VrlValue};
24
25use crate::error::{
26 BytesToUtf8Snafu, CompileVrlSnafu, Error, ExecuteVrlSnafu, FloatNaNSnafu,
27 InvalidTimestampSnafu, KeyMustBeStringSnafu, Result, VrlRegexValueSnafu, VrlReturnValueSnafu,
28};
29use crate::etl::processor::yaml_string;
30use crate::{PipelineMap, Value as PipelineValue};
31
32pub(crate) const PROCESSOR_VRL: &str = "vrl";
33const SOURCE: &str = "source";
34
35#[derive(Debug)]
36pub struct VrlProcessor {
37 source: String,
38 program: Program,
39}
40
41impl VrlProcessor {
42 pub fn new(source: String) -> Result<Self> {
43 let fns = vrl::stdlib::all();
44
45 let compile_result = compile(&source, &fns).map_err(|e| {
46 CompileVrlSnafu {
47 msg: Formatter::new(&source, e).to_string(),
48 }
49 .build()
50 })?;
51
52 let program = compile_result.program;
53
54 let result_def = program.final_type_info().result;
56 let kind = result_def.kind();
57 if !kind.is_object() {
58 return VrlReturnValueSnafu.fail();
59 }
60 check_regex_output(kind)?;
61
62 Ok(Self { source, program })
63 }
64
65 pub fn resolve(&self, m: PipelineMap) -> Result<PipelineValue> {
66 let pipeline_vrl = m
67 .into_iter()
68 .map(|(k, v)| pipeline_value_to_vrl_value(v).map(|v| (KeyString::from(k), v)))
69 .collect::<Result<BTreeMap<_, _>>>()?;
70
71 let mut target = TargetValue {
72 value: VrlValue::Object(pipeline_vrl),
73 metadata: VrlValue::Object(BTreeMap::new()),
74 secrets: Secrets::default(),
75 };
76
77 let timezone = TimeZone::Named(Tz::UTC);
78 let mut runtime = Runtime::default();
79 let re = runtime
80 .resolve(&mut target, &self.program, &timezone)
81 .map_err(|e| {
82 ExecuteVrlSnafu {
83 msg: e.get_expression_error().to_string(),
84 }
85 .build()
86 })?;
87
88 vrl_value_to_pipeline_value(re)
89 }
90}
91
92impl TryFrom<&yaml_rust::yaml::Hash> for VrlProcessor {
93 type Error = Error;
94
95 fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
96 let mut source = String::new();
97 for (k, v) in value.iter() {
98 let key = k
99 .as_str()
100 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
101 if key == SOURCE {
102 source = yaml_string(v, SOURCE)?;
103 }
104 }
105 let processor = VrlProcessor::new(source)?;
106 Ok(processor)
107 }
108}
109
110impl crate::etl::processor::Processor for VrlProcessor {
111 fn kind(&self) -> &str {
112 PROCESSOR_VRL
113 }
114
115 fn ignore_missing(&self) -> bool {
116 true
117 }
118
119 fn exec_mut(&self, val: PipelineMap) -> Result<PipelineMap> {
120 let val = self.resolve(val)?;
121
122 if let PipelineValue::Map(m) = val {
123 Ok(m.values)
124 } else {
125 VrlRegexValueSnafu.fail()
126 }
127 }
128}
129
130fn pipeline_value_to_vrl_value(v: PipelineValue) -> Result<VrlValue> {
131 match v {
132 PipelineValue::Null => Ok(VrlValue::Null),
133 PipelineValue::Int8(x) => Ok(VrlValue::Integer(x as i64)),
134 PipelineValue::Int16(x) => Ok(VrlValue::Integer(x as i64)),
135 PipelineValue::Int32(x) => Ok(VrlValue::Integer(x as i64)),
136 PipelineValue::Int64(x) => Ok(VrlValue::Integer(x)),
137 PipelineValue::Uint8(x) => Ok(VrlValue::Integer(x as i64)),
138 PipelineValue::Uint16(x) => Ok(VrlValue::Integer(x as i64)),
139 PipelineValue::Uint32(x) => Ok(VrlValue::Integer(x as i64)),
140 PipelineValue::Uint64(x) => Ok(VrlValue::Integer(x as i64)),
141 PipelineValue::Float32(x) => NotNan::new(x as f64)
142 .map_err(|_| FloatNaNSnafu { input_float: x }.build())
143 .map(VrlValue::Float),
144 PipelineValue::Float64(x) => NotNan::new(x)
145 .map_err(|_| FloatNaNSnafu { input_float: x }.build())
146 .map(VrlValue::Float),
147 PipelineValue::Boolean(x) => Ok(VrlValue::Boolean(x)),
148 PipelineValue::String(x) => Ok(VrlValue::Bytes(Bytes::copy_from_slice(x.as_bytes()))),
149 PipelineValue::Timestamp(x) => x
150 .to_datetime()
151 .context(InvalidTimestampSnafu {
152 input: x.to_string(),
153 })
154 .map(VrlValue::Timestamp),
155 PipelineValue::Array(array) => Ok(VrlValue::Array(
156 array
157 .into_iter()
158 .map(pipeline_value_to_vrl_value)
159 .collect::<Result<Vec<_>>>()?,
160 )),
161 PipelineValue::Map(m) => {
162 let values = m
163 .values
164 .into_iter()
165 .map(|(k, v)| pipeline_value_to_vrl_value(v).map(|v| (KeyString::from(k), v)))
166 .collect::<Result<BTreeMap<_, _>>>()?;
167 Ok(VrlValue::Object(values))
168 }
169 }
170}
171
172fn vrl_value_to_pipeline_value(v: VrlValue) -> Result<PipelineValue> {
173 match v {
174 VrlValue::Bytes(bytes) => String::from_utf8(bytes.to_vec())
175 .context(BytesToUtf8Snafu)
176 .map(PipelineValue::String),
177 VrlValue::Regex(_) => VrlRegexValueSnafu.fail(),
178 VrlValue::Integer(x) => Ok(PipelineValue::Int64(x)),
179 VrlValue::Float(not_nan) => Ok(PipelineValue::Float64(not_nan.into_inner())),
180 VrlValue::Boolean(b) => Ok(PipelineValue::Boolean(b)),
181 VrlValue::Timestamp(date_time) => crate::etl::value::Timestamp::from_datetime(date_time)
182 .context(InvalidTimestampSnafu {
183 input: date_time.to_string(),
184 })
185 .map(PipelineValue::Timestamp),
186 VrlValue::Object(bm) => {
187 let b = bm
188 .into_iter()
189 .map(|(k, v)| vrl_value_to_pipeline_value(v).map(|v| (k.to_string(), v)))
190 .collect::<Result<BTreeMap<String, PipelineValue>>>()?;
191 Ok(PipelineValue::Map(b.into()))
192 }
193 VrlValue::Array(values) => {
194 let a = values
195 .into_iter()
196 .map(vrl_value_to_pipeline_value)
197 .collect::<Result<Vec<_>>>()?;
198 Ok(PipelineValue::Array(a.into()))
199 }
200 VrlValue::Null => Ok(PipelineValue::Null),
201 }
202}
203
204fn check_regex_output(output_kind: &Kind) -> Result<()> {
205 if output_kind.is_regex() {
206 return VrlRegexValueSnafu.fail();
207 }
208
209 if let Some(arr) = output_kind.as_array() {
210 let k = arr.known();
211 for v in k.values() {
212 check_regex_output(v)?
213 }
214 }
215
216 if let Some(obj) = output_kind.as_object() {
217 let k = obj.known();
218 for v in k.values() {
219 check_regex_output(v)?
220 }
221 }
222
223 Ok(())
224}
225
226#[cfg(test)]
227mod tests {
228
229 use super::*;
230 use crate::etl::value::Timestamp;
231 use crate::Map;
232
233 #[test]
234 fn test_vrl() {
235 let source = r#"
236.name.a = .user_info.name
237.name.b = .user_info.name
238del(.user_info)
239.timestamp = now()
240.
241"#;
242
243 let v = VrlProcessor::new(source.to_string());
244 assert!(v.is_ok());
245 let v = v.unwrap();
246
247 let mut n = PipelineMap::new();
248 n.insert(
249 "name".to_string(),
250 PipelineValue::String("certain_name".to_string()),
251 );
252
253 let mut m = PipelineMap::new();
254 m.insert(
255 "user_info".to_string(),
256 PipelineValue::Map(Map { values: n }),
257 );
258
259 let re = v.resolve(m);
260 assert!(re.is_ok());
261 let re = re.unwrap();
262
263 assert!(matches!(re, PipelineValue::Map(_)));
264 assert!(re.get("name").is_some());
265 let name = re.get("name").unwrap();
266 assert!(matches!(name.get("a").unwrap(), PipelineValue::String(x) if x == "certain_name"));
267 assert!(matches!(name.get("b").unwrap(), PipelineValue::String(x) if x == "certain_name"));
268 assert!(re.get("timestamp").is_some());
269 let timestamp = re.get("timestamp").unwrap();
270 assert!(matches!(
271 timestamp,
272 PipelineValue::Timestamp(Timestamp::Nanosecond(_))
273 ));
274 }
275
276 #[test]
277 fn test_yaml_to_vrl() {
278 let yaml = r#"
279processors:
280 - vrl:
281 source: |
282 .name.a = .user_info.name
283 .name.b = .user_info.name
284 del(.user_info)
285 .timestamp = now()
286 .
287"#;
288 let y = yaml_rust::YamlLoader::load_from_str(yaml).unwrap();
289 let vrl_processor_yaml = y
290 .first()
291 .and_then(|x| x.as_hash())
292 .and_then(|x| x.get(&yaml_rust::Yaml::String("processors".to_string())))
293 .and_then(|x| x.as_vec())
294 .and_then(|x| x.first())
295 .and_then(|x| x.as_hash())
296 .and_then(|x| x.get(&yaml_rust::Yaml::String("vrl".to_string())))
297 .and_then(|x| x.as_hash())
298 .unwrap();
299
300 let vrl = VrlProcessor::try_from(vrl_processor_yaml);
301 assert!(vrl.is_ok());
302 let vrl = vrl.unwrap();
303
304 assert_eq!(vrl.source, ".name.a = .user_info.name\n.name.b = .user_info.name\ndel(.user_info)\n.timestamp = now()\n.\n");
305 }
306
307 #[test]
308 fn test_regex() {
309 let source = r#"
310.re = r'(?i)^Hello, World!$'
311del(.re)
312.re = r'(?i)^Hello, World!$'
313.
314"#;
315
316 let v = VrlProcessor::new(source.to_string());
317 assert!(v.is_err());
318 }
319}