pipeline/etl/processor/
vrl_processor.rs1use std::collections::BTreeMap;
16
17use chrono_tz::Tz;
18use snafu::OptionExt;
19use vrl::compiler::runtime::Runtime;
20use vrl::compiler::{compile, Program, TargetValue};
21use vrl::diagnostic::Formatter;
22use vrl::prelude::TimeZone;
23use vrl::value::{Kind, Secrets, Value as VrlValue};
24
25use crate::error::{
26 CompileVrlSnafu, Error, ExecuteVrlSnafu, KeyMustBeStringSnafu, Result, VrlRegexValueSnafu,
27 VrlReturnValueSnafu,
28};
29use crate::etl::processor::yaml_string;
30
31pub(crate) const PROCESSOR_VRL: &str = "vrl";
32const SOURCE: &str = "source";
33
34#[derive(Debug)]
35pub struct VrlProcessor {
36 source: String,
37 program: Program,
38}
39
40impl VrlProcessor {
41 pub fn new(source: String) -> Result<Self> {
42 let fns = vrl::stdlib::all();
43
44 let compile_result = compile(&source, &fns).map_err(|e| {
45 CompileVrlSnafu {
46 msg: Formatter::new(&source, e).to_string(),
47 }
48 .build()
49 })?;
50
51 let program = compile_result.program;
52
53 let result_def = program.final_type_info().result;
55 let kind = result_def.kind();
56 if !kind.is_object() {
57 return VrlReturnValueSnafu.fail();
58 }
59 check_regex_output(kind)?;
60
61 Ok(Self { source, program })
62 }
63
64 pub fn resolve(&self, value: VrlValue) -> Result<VrlValue> {
65 let mut target = TargetValue {
66 value,
67 metadata: VrlValue::Object(BTreeMap::new()),
68 secrets: Secrets::default(),
69 };
70
71 let timezone = TimeZone::Named(Tz::UTC);
72 let mut runtime = Runtime::default();
73 let re = runtime
74 .resolve(&mut target, &self.program, &timezone)
75 .map_err(|e| {
76 ExecuteVrlSnafu {
77 msg: e.get_expression_error().to_string(),
78 }
79 .build()
80 })?;
81
82 Ok(re)
83 }
84}
85
86impl TryFrom<&yaml_rust::yaml::Hash> for VrlProcessor {
87 type Error = Error;
88
89 fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
90 let mut source = String::new();
91 for (k, v) in value.iter() {
92 let key = k
93 .as_str()
94 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
95 if key == SOURCE {
96 source = yaml_string(v, SOURCE)?;
97 }
98 }
99 let processor = VrlProcessor::new(source)?;
100 Ok(processor)
101 }
102}
103
104impl crate::etl::processor::Processor for VrlProcessor {
105 fn kind(&self) -> &str {
106 PROCESSOR_VRL
107 }
108
109 fn ignore_missing(&self) -> bool {
110 true
111 }
112
113 fn exec_mut(&self, val: VrlValue) -> Result<VrlValue> {
114 let val = self.resolve(val)?;
115
116 if let VrlValue::Object(_) = val {
117 Ok(val)
118 } else {
119 VrlRegexValueSnafu.fail()
120 }
121 }
122}
123
124fn check_regex_output(output_kind: &Kind) -> Result<()> {
125 if output_kind.is_regex() {
126 return VrlRegexValueSnafu.fail();
127 }
128
129 if let Some(arr) = output_kind.as_array() {
130 let k = arr.known();
131 for v in k.values() {
132 check_regex_output(v)?
133 }
134 }
135
136 if let Some(obj) = output_kind.as_object() {
137 let k = obj.known();
138 for v in k.values() {
139 check_regex_output(v)?
140 }
141 }
142
143 Ok(())
144}
145
146#[cfg(test)]
147mod tests {
148
149 use vrl::prelude::Bytes;
150 use vrl::value::KeyString;
151
152 use super::*;
153
154 #[test]
155 fn test_vrl() {
156 let source = r#"
157.name.a = .user_info.name
158.name.b = .user_info.name
159del(.user_info)
160.timestamp = now()
161.
162"#;
163
164 let v = VrlProcessor::new(source.to_string());
165 assert!(v.is_ok());
166 let v = v.unwrap();
167
168 let mut n = BTreeMap::new();
169 n.insert(
170 KeyString::from("name"),
171 VrlValue::Bytes(Bytes::from("certain_name")),
172 );
173
174 let mut m = BTreeMap::new();
175 m.insert(KeyString::from("user_info"), VrlValue::Object(n));
176
177 let re = v.resolve(VrlValue::Object(m));
178 assert!(re.is_ok());
179 let re = re.unwrap();
180
181 assert!(matches!(re, VrlValue::Object(_)));
182 let re = re.as_object().unwrap();
183 assert!(re.get("name").is_some());
184 let name = re.get("name").unwrap();
185 let name = name.as_object().unwrap();
186 assert!(matches!(name.get("a").unwrap(), VrlValue::Bytes(x) if x == "certain_name"));
187 assert!(matches!(name.get("b").unwrap(), VrlValue::Bytes(x) if x == "certain_name"));
188 assert!(re.get("timestamp").is_some());
189 let timestamp = re.get("timestamp").unwrap();
190 assert!(matches!(timestamp, VrlValue::Timestamp(_)));
191 }
192
193 #[test]
194 fn test_yaml_to_vrl() {
195 let yaml = r#"
196processors:
197 - vrl:
198 source: |
199 .name.a = .user_info.name
200 .name.b = .user_info.name
201 del(.user_info)
202 .timestamp = now()
203 .
204"#;
205 let y = yaml_rust::YamlLoader::load_from_str(yaml).unwrap();
206 let vrl_processor_yaml = y
207 .first()
208 .and_then(|x| x.as_hash())
209 .and_then(|x| x.get(&yaml_rust::Yaml::String("processors".to_string())))
210 .and_then(|x| x.as_vec())
211 .and_then(|x| x.first())
212 .and_then(|x| x.as_hash())
213 .and_then(|x| x.get(&yaml_rust::Yaml::String("vrl".to_string())))
214 .and_then(|x| x.as_hash())
215 .unwrap();
216
217 let vrl = VrlProcessor::try_from(vrl_processor_yaml);
218 assert!(vrl.is_ok());
219 let vrl = vrl.unwrap();
220
221 assert_eq!(vrl.source, ".name.a = .user_info.name\n.name.b = .user_info.name\ndel(.user_info)\n.timestamp = now()\n.\n");
222 }
223
224 #[test]
225 fn test_regex() {
226 let source = r#"
227.re = r'(?i)^Hello, World!$'
228del(.re)
229.re = r'(?i)^Hello, World!$'
230.
231"#;
232
233 let v = VrlProcessor::new(source.to_string());
234 assert!(v.is_err());
235 }
236}