pipeline/etl/processor/
vrl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use chrono_tz::Tz;
18use snafu::{OptionExt, ResultExt};
19use vrl::compiler::runtime::Runtime;
20use vrl::compiler::{compile, Program, TargetValue};
21use vrl::diagnostic::Formatter;
22use vrl::prelude::{Bytes, NotNan, TimeZone};
23use vrl::value::{KeyString, Kind, Secrets, Value as VrlValue};
24
25use crate::error::{
26    BytesToUtf8Snafu, CompileVrlSnafu, Error, ExecuteVrlSnafu, FloatNaNSnafu,
27    InvalidTimestampSnafu, KeyMustBeStringSnafu, Result, VrlRegexValueSnafu, VrlReturnValueSnafu,
28};
29use crate::etl::processor::yaml_string;
30use crate::{PipelineMap, Value as PipelineValue};
31
32pub(crate) const PROCESSOR_VRL: &str = "vrl";
33const SOURCE: &str = "source";
34
35#[derive(Debug)]
36pub struct VrlProcessor {
37    source: String,
38    program: Program,
39}
40
41impl VrlProcessor {
42    pub fn new(source: String) -> Result<Self> {
43        let fns = vrl::stdlib::all();
44
45        let compile_result = compile(&source, &fns).map_err(|e| {
46            CompileVrlSnafu {
47                msg: Formatter::new(&source, e).to_string(),
48            }
49            .build()
50        })?;
51
52        let program = compile_result.program;
53
54        // check if the return value is have regex
55        let result_def = program.final_type_info().result;
56        let kind = result_def.kind();
57        if !kind.is_object() {
58            return VrlReturnValueSnafu.fail();
59        }
60        check_regex_output(kind)?;
61
62        Ok(Self { source, program })
63    }
64
65    pub fn resolve(&self, m: PipelineMap) -> Result<PipelineValue> {
66        let pipeline_vrl = m
67            .into_iter()
68            .map(|(k, v)| pipeline_value_to_vrl_value(v).map(|v| (KeyString::from(k), v)))
69            .collect::<Result<BTreeMap<_, _>>>()?;
70
71        let mut target = TargetValue {
72            value: VrlValue::Object(pipeline_vrl),
73            metadata: VrlValue::Object(BTreeMap::new()),
74            secrets: Secrets::default(),
75        };
76
77        let timezone = TimeZone::Named(Tz::UTC);
78        let mut runtime = Runtime::default();
79        let re = runtime
80            .resolve(&mut target, &self.program, &timezone)
81            .map_err(|e| {
82                ExecuteVrlSnafu {
83                    msg: e.get_expression_error().to_string(),
84                }
85                .build()
86            })?;
87
88        vrl_value_to_pipeline_value(re)
89    }
90}
91
92impl TryFrom<&yaml_rust::yaml::Hash> for VrlProcessor {
93    type Error = Error;
94
95    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
96        let mut source = String::new();
97        for (k, v) in value.iter() {
98            let key = k
99                .as_str()
100                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
101            if key == SOURCE {
102                source = yaml_string(v, SOURCE)?;
103            }
104        }
105        let processor = VrlProcessor::new(source)?;
106        Ok(processor)
107    }
108}
109
110impl crate::etl::processor::Processor for VrlProcessor {
111    fn kind(&self) -> &str {
112        PROCESSOR_VRL
113    }
114
115    fn ignore_missing(&self) -> bool {
116        true
117    }
118
119    fn exec_mut(&self, val: PipelineMap) -> Result<PipelineMap> {
120        let val = self.resolve(val)?;
121
122        if let PipelineValue::Map(m) = val {
123            Ok(m.values)
124        } else {
125            VrlRegexValueSnafu.fail()
126        }
127    }
128}
129
130fn pipeline_value_to_vrl_value(v: PipelineValue) -> Result<VrlValue> {
131    match v {
132        PipelineValue::Null => Ok(VrlValue::Null),
133        PipelineValue::Int8(x) => Ok(VrlValue::Integer(x as i64)),
134        PipelineValue::Int16(x) => Ok(VrlValue::Integer(x as i64)),
135        PipelineValue::Int32(x) => Ok(VrlValue::Integer(x as i64)),
136        PipelineValue::Int64(x) => Ok(VrlValue::Integer(x)),
137        PipelineValue::Uint8(x) => Ok(VrlValue::Integer(x as i64)),
138        PipelineValue::Uint16(x) => Ok(VrlValue::Integer(x as i64)),
139        PipelineValue::Uint32(x) => Ok(VrlValue::Integer(x as i64)),
140        PipelineValue::Uint64(x) => Ok(VrlValue::Integer(x as i64)),
141        PipelineValue::Float32(x) => NotNan::new(x as f64)
142            .map_err(|_| FloatNaNSnafu { input_float: x }.build())
143            .map(VrlValue::Float),
144        PipelineValue::Float64(x) => NotNan::new(x)
145            .map_err(|_| FloatNaNSnafu { input_float: x }.build())
146            .map(VrlValue::Float),
147        PipelineValue::Boolean(x) => Ok(VrlValue::Boolean(x)),
148        PipelineValue::String(x) => Ok(VrlValue::Bytes(Bytes::copy_from_slice(x.as_bytes()))),
149        PipelineValue::Timestamp(x) => x
150            .to_datetime()
151            .context(InvalidTimestampSnafu {
152                input: x.to_string(),
153            })
154            .map(VrlValue::Timestamp),
155        PipelineValue::Array(array) => Ok(VrlValue::Array(
156            array
157                .into_iter()
158                .map(pipeline_value_to_vrl_value)
159                .collect::<Result<Vec<_>>>()?,
160        )),
161        PipelineValue::Map(m) => {
162            let values = m
163                .values
164                .into_iter()
165                .map(|(k, v)| pipeline_value_to_vrl_value(v).map(|v| (KeyString::from(k), v)))
166                .collect::<Result<BTreeMap<_, _>>>()?;
167            Ok(VrlValue::Object(values))
168        }
169    }
170}
171
172fn vrl_value_to_pipeline_value(v: VrlValue) -> Result<PipelineValue> {
173    match v {
174        VrlValue::Bytes(bytes) => String::from_utf8(bytes.to_vec())
175            .context(BytesToUtf8Snafu)
176            .map(PipelineValue::String),
177        VrlValue::Regex(_) => VrlRegexValueSnafu.fail(),
178        VrlValue::Integer(x) => Ok(PipelineValue::Int64(x)),
179        VrlValue::Float(not_nan) => Ok(PipelineValue::Float64(not_nan.into_inner())),
180        VrlValue::Boolean(b) => Ok(PipelineValue::Boolean(b)),
181        VrlValue::Timestamp(date_time) => crate::etl::value::Timestamp::from_datetime(date_time)
182            .context(InvalidTimestampSnafu {
183                input: date_time.to_string(),
184            })
185            .map(PipelineValue::Timestamp),
186        VrlValue::Object(bm) => {
187            let b = bm
188                .into_iter()
189                .map(|(k, v)| vrl_value_to_pipeline_value(v).map(|v| (k.to_string(), v)))
190                .collect::<Result<BTreeMap<String, PipelineValue>>>()?;
191            Ok(PipelineValue::Map(b.into()))
192        }
193        VrlValue::Array(values) => {
194            let a = values
195                .into_iter()
196                .map(vrl_value_to_pipeline_value)
197                .collect::<Result<Vec<_>>>()?;
198            Ok(PipelineValue::Array(a.into()))
199        }
200        VrlValue::Null => Ok(PipelineValue::Null),
201    }
202}
203
204fn check_regex_output(output_kind: &Kind) -> Result<()> {
205    if output_kind.is_regex() {
206        return VrlRegexValueSnafu.fail();
207    }
208
209    if let Some(arr) = output_kind.as_array() {
210        let k = arr.known();
211        for v in k.values() {
212            check_regex_output(v)?
213        }
214    }
215
216    if let Some(obj) = output_kind.as_object() {
217        let k = obj.known();
218        for v in k.values() {
219            check_regex_output(v)?
220        }
221    }
222
223    Ok(())
224}
225
226#[cfg(test)]
227mod tests {
228
229    use super::*;
230    use crate::etl::value::Timestamp;
231    use crate::Map;
232
233    #[test]
234    fn test_vrl() {
235        let source = r#"
236.name.a = .user_info.name
237.name.b = .user_info.name
238del(.user_info)
239.timestamp = now()
240.
241"#;
242
243        let v = VrlProcessor::new(source.to_string());
244        assert!(v.is_ok());
245        let v = v.unwrap();
246
247        let mut n = PipelineMap::new();
248        n.insert(
249            "name".to_string(),
250            PipelineValue::String("certain_name".to_string()),
251        );
252
253        let mut m = PipelineMap::new();
254        m.insert(
255            "user_info".to_string(),
256            PipelineValue::Map(Map { values: n }),
257        );
258
259        let re = v.resolve(m);
260        assert!(re.is_ok());
261        let re = re.unwrap();
262
263        assert!(matches!(re, PipelineValue::Map(_)));
264        assert!(re.get("name").is_some());
265        let name = re.get("name").unwrap();
266        assert!(matches!(name.get("a").unwrap(), PipelineValue::String(x) if x == "certain_name"));
267        assert!(matches!(name.get("b").unwrap(), PipelineValue::String(x) if x == "certain_name"));
268        assert!(re.get("timestamp").is_some());
269        let timestamp = re.get("timestamp").unwrap();
270        assert!(matches!(
271            timestamp,
272            PipelineValue::Timestamp(Timestamp::Nanosecond(_))
273        ));
274    }
275
276    #[test]
277    fn test_yaml_to_vrl() {
278        let yaml = r#"
279processors:
280  - vrl:
281      source: |
282        .name.a = .user_info.name
283        .name.b = .user_info.name
284        del(.user_info)
285        .timestamp = now()
286        .
287"#;
288        let y = yaml_rust::YamlLoader::load_from_str(yaml).unwrap();
289        let vrl_processor_yaml = y
290            .first()
291            .and_then(|x| x.as_hash())
292            .and_then(|x| x.get(&yaml_rust::Yaml::String("processors".to_string())))
293            .and_then(|x| x.as_vec())
294            .and_then(|x| x.first())
295            .and_then(|x| x.as_hash())
296            .and_then(|x| x.get(&yaml_rust::Yaml::String("vrl".to_string())))
297            .and_then(|x| x.as_hash())
298            .unwrap();
299
300        let vrl = VrlProcessor::try_from(vrl_processor_yaml);
301        assert!(vrl.is_ok());
302        let vrl = vrl.unwrap();
303
304        assert_eq!(vrl.source, ".name.a = .user_info.name\n.name.b = .user_info.name\ndel(.user_info)\n.timestamp = now()\n.\n");
305    }
306
307    #[test]
308    fn test_regex() {
309        let source = r#"
310.re = r'(?i)^Hello, World!$'
311del(.re)
312.re = r'(?i)^Hello, World!$'
313.
314"#;
315
316        let v = VrlProcessor::new(source.to_string());
317        assert!(v.is_err());
318    }
319}