pipeline/etl/processor/
vrl_processor.rs1use std::collections::BTreeMap;
16
17use chrono_tz::Tz;
18use snafu::{OptionExt, ensure};
19use vrl::compiler::runtime::Runtime;
20use vrl::compiler::{Program, TargetValue, compile};
21use vrl::diagnostic::Formatter;
22use vrl::prelude::TimeZone;
23use vrl::value::{Kind, Secrets, Value as VrlValue};
24
25use crate::error::{
26 CompileVrlSnafu, Error, ExecuteVrlSnafu, KeyMustBeStringSnafu, Result, VrlRegexValueSnafu,
27 VrlReturnValueSnafu,
28};
29use crate::etl::processor::yaml_string;
30
31pub(crate) const PROCESSOR_VRL: &str = "vrl";
32const SOURCE: &str = "source";
33
34#[derive(Debug)]
35pub struct VrlProcessor {
36 source: String,
37 program: Program,
38}
39
40impl VrlProcessor {
41 pub fn new(source: String) -> Result<Self> {
42 let fns = vrl::stdlib::all();
43
44 let compile_result = compile(&source, &fns).map_err(|e| {
45 CompileVrlSnafu {
46 msg: Formatter::new(&source, e).to_string(),
47 }
48 .build()
49 })?;
50
51 let program = compile_result.program;
52
53 let result_def = program.final_type_info().result;
55 let kind = result_def.kind();
56 ensure!(
60 kind.contains_object() || kind.contains_array(),
61 VrlReturnValueSnafu {
62 result_kind: kind.clone(),
63 }
64 );
65 check_regex_output(kind)?;
66
67 Ok(Self { source, program })
68 }
69
70 pub fn resolve(&self, value: VrlValue) -> Result<VrlValue> {
71 let mut target = TargetValue {
72 value,
73 metadata: VrlValue::Object(BTreeMap::new()),
74 secrets: Secrets::default(),
75 };
76
77 let timezone = TimeZone::Named(Tz::UTC);
78 let mut runtime = Runtime::default();
79 let re = runtime
80 .resolve(&mut target, &self.program, &timezone)
81 .map_err(|e| {
82 ExecuteVrlSnafu {
83 msg: e.get_expression_error().to_string(),
84 }
85 .build()
86 })?;
87
88 Ok(re)
89 }
90}
91
92impl TryFrom<&yaml_rust::yaml::Hash> for VrlProcessor {
93 type Error = Error;
94
95 fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
96 let mut source = String::new();
97 for (k, v) in value.iter() {
98 let key = k
99 .as_str()
100 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
101 if key == SOURCE {
102 source = yaml_string(v, SOURCE)?;
103 }
104 }
105 let processor = VrlProcessor::new(source)?;
106 Ok(processor)
107 }
108}
109
110impl crate::etl::processor::Processor for VrlProcessor {
111 fn kind(&self) -> &str {
112 PROCESSOR_VRL
113 }
114
115 fn ignore_missing(&self) -> bool {
116 true
117 }
118
119 fn exec_mut(&self, val: VrlValue) -> Result<VrlValue> {
120 self.resolve(val)
121 }
122}
123
124fn check_regex_output(output_kind: &Kind) -> Result<()> {
125 if output_kind.is_regex() {
126 return VrlRegexValueSnafu.fail();
127 }
128
129 if let Some(arr) = output_kind.as_array() {
130 let k = arr.known();
131 for v in k.values() {
132 check_regex_output(v)?
133 }
134 }
135
136 if let Some(obj) = output_kind.as_object() {
137 let k = obj.known();
138 for v in k.values() {
139 check_regex_output(v)?
140 }
141 }
142
143 Ok(())
144}
145
146#[cfg(test)]
147mod tests {
148
149 use vrl::prelude::Bytes;
150 use vrl::value::KeyString;
151
152 use super::*;
153
154 #[test]
155 fn test_vrl() {
156 let source = r#"
157.name.a = .user_info.name
158.name.b = .user_info.name
159del(.user_info)
160.timestamp = now()
161.
162"#;
163
164 let v = VrlProcessor::new(source.to_string());
165 assert!(v.is_ok());
166 let v = v.unwrap();
167
168 let mut n = BTreeMap::new();
169 n.insert(
170 KeyString::from("name"),
171 VrlValue::Bytes(Bytes::from("certain_name")),
172 );
173
174 let mut m = BTreeMap::new();
175 m.insert(KeyString::from("user_info"), VrlValue::Object(n));
176
177 let re = v.resolve(VrlValue::Object(m));
178 assert!(re.is_ok());
179 let re = re.unwrap();
180
181 assert!(matches!(re, VrlValue::Object(_)));
182 let re = re.as_object().unwrap();
183 assert!(re.get("name").is_some());
184 let name = re.get("name").unwrap();
185 let name = name.as_object().unwrap();
186 assert!(matches!(name.get("a").unwrap(), VrlValue::Bytes(x) if x == "certain_name"));
187 assert!(matches!(name.get("b").unwrap(), VrlValue::Bytes(x) if x == "certain_name"));
188 assert!(re.get("timestamp").is_some());
189 let timestamp = re.get("timestamp").unwrap();
190 assert!(matches!(timestamp, VrlValue::Timestamp(_)));
191 }
192
193 #[test]
194 fn test_yaml_to_vrl() {
195 let yaml = r#"
196processors:
197 - vrl:
198 source: |
199 .name.a = .user_info.name
200 .name.b = .user_info.name
201 del(.user_info)
202 .timestamp = now()
203 .
204"#;
205 let y = yaml_rust::YamlLoader::load_from_str(yaml).unwrap();
206 let vrl_processor_yaml = y
207 .first()
208 .and_then(|x| x.as_hash())
209 .and_then(|x| x.get(&yaml_rust::Yaml::String("processors".to_string())))
210 .and_then(|x| x.as_vec())
211 .and_then(|x| x.first())
212 .and_then(|x| x.as_hash())
213 .and_then(|x| x.get(&yaml_rust::Yaml::String("vrl".to_string())))
214 .and_then(|x| x.as_hash())
215 .unwrap();
216
217 let vrl = VrlProcessor::try_from(vrl_processor_yaml);
218 assert!(vrl.is_ok());
219 let vrl = vrl.unwrap();
220
221 assert_eq!(
222 vrl.source,
223 ".name.a = .user_info.name\n.name.b = .user_info.name\ndel(.user_info)\n.timestamp = now()\n.\n"
224 );
225 }
226
227 #[test]
228 fn test_regex() {
229 let source = r#"
230.re = r'(?i)^Hello, World!$'
231del(.re)
232.re = r'(?i)^Hello, World!$'
233.
234"#;
235
236 let v = VrlProcessor::new(source.to_string());
237 assert!(v.is_err());
238 }
239}