pipeline/etl/processor/
vrl_processor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use chrono_tz::Tz;
18use snafu::OptionExt;
19use vrl::compiler::runtime::Runtime;
20use vrl::compiler::{compile, Program, TargetValue};
21use vrl::diagnostic::Formatter;
22use vrl::prelude::TimeZone;
23use vrl::value::{Kind, Secrets, Value as VrlValue};
24
25use crate::error::{
26    CompileVrlSnafu, Error, ExecuteVrlSnafu, KeyMustBeStringSnafu, Result, VrlRegexValueSnafu,
27    VrlReturnValueSnafu,
28};
29use crate::etl::processor::yaml_string;
30
31pub(crate) const PROCESSOR_VRL: &str = "vrl";
32const SOURCE: &str = "source";
33
34#[derive(Debug)]
35pub struct VrlProcessor {
36    source: String,
37    program: Program,
38}
39
40impl VrlProcessor {
41    pub fn new(source: String) -> Result<Self> {
42        let fns = vrl::stdlib::all();
43
44        let compile_result = compile(&source, &fns).map_err(|e| {
45            CompileVrlSnafu {
46                msg: Formatter::new(&source, e).to_string(),
47            }
48            .build()
49        })?;
50
51        let program = compile_result.program;
52
53        // check if the return value is have regex
54        let result_def = program.final_type_info().result;
55        let kind = result_def.kind();
56        if !kind.is_object() {
57            return VrlReturnValueSnafu.fail();
58        }
59        check_regex_output(kind)?;
60
61        Ok(Self { source, program })
62    }
63
64    pub fn resolve(&self, value: VrlValue) -> Result<VrlValue> {
65        let mut target = TargetValue {
66            value,
67            metadata: VrlValue::Object(BTreeMap::new()),
68            secrets: Secrets::default(),
69        };
70
71        let timezone = TimeZone::Named(Tz::UTC);
72        let mut runtime = Runtime::default();
73        let re = runtime
74            .resolve(&mut target, &self.program, &timezone)
75            .map_err(|e| {
76                ExecuteVrlSnafu {
77                    msg: e.get_expression_error().to_string(),
78                }
79                .build()
80            })?;
81
82        Ok(re)
83    }
84}
85
86impl TryFrom<&yaml_rust::yaml::Hash> for VrlProcessor {
87    type Error = Error;
88
89    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
90        let mut source = String::new();
91        for (k, v) in value.iter() {
92            let key = k
93                .as_str()
94                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
95            if key == SOURCE {
96                source = yaml_string(v, SOURCE)?;
97            }
98        }
99        let processor = VrlProcessor::new(source)?;
100        Ok(processor)
101    }
102}
103
104impl crate::etl::processor::Processor for VrlProcessor {
105    fn kind(&self) -> &str {
106        PROCESSOR_VRL
107    }
108
109    fn ignore_missing(&self) -> bool {
110        true
111    }
112
113    fn exec_mut(&self, val: VrlValue) -> Result<VrlValue> {
114        let val = self.resolve(val)?;
115
116        if let VrlValue::Object(_) = val {
117            Ok(val)
118        } else {
119            VrlRegexValueSnafu.fail()
120        }
121    }
122}
123
124fn check_regex_output(output_kind: &Kind) -> Result<()> {
125    if output_kind.is_regex() {
126        return VrlRegexValueSnafu.fail();
127    }
128
129    if let Some(arr) = output_kind.as_array() {
130        let k = arr.known();
131        for v in k.values() {
132            check_regex_output(v)?
133        }
134    }
135
136    if let Some(obj) = output_kind.as_object() {
137        let k = obj.known();
138        for v in k.values() {
139            check_regex_output(v)?
140        }
141    }
142
143    Ok(())
144}
145
146#[cfg(test)]
147mod tests {
148
149    use vrl::prelude::Bytes;
150    use vrl::value::KeyString;
151
152    use super::*;
153
154    #[test]
155    fn test_vrl() {
156        let source = r#"
157.name.a = .user_info.name
158.name.b = .user_info.name
159del(.user_info)
160.timestamp = now()
161.
162"#;
163
164        let v = VrlProcessor::new(source.to_string());
165        assert!(v.is_ok());
166        let v = v.unwrap();
167
168        let mut n = BTreeMap::new();
169        n.insert(
170            KeyString::from("name"),
171            VrlValue::Bytes(Bytes::from("certain_name")),
172        );
173
174        let mut m = BTreeMap::new();
175        m.insert(KeyString::from("user_info"), VrlValue::Object(n));
176
177        let re = v.resolve(VrlValue::Object(m));
178        assert!(re.is_ok());
179        let re = re.unwrap();
180
181        assert!(matches!(re, VrlValue::Object(_)));
182        let re = re.as_object().unwrap();
183        assert!(re.get("name").is_some());
184        let name = re.get("name").unwrap();
185        let name = name.as_object().unwrap();
186        assert!(matches!(name.get("a").unwrap(), VrlValue::Bytes(x) if x == "certain_name"));
187        assert!(matches!(name.get("b").unwrap(), VrlValue::Bytes(x) if x == "certain_name"));
188        assert!(re.get("timestamp").is_some());
189        let timestamp = re.get("timestamp").unwrap();
190        assert!(matches!(timestamp, VrlValue::Timestamp(_)));
191    }
192
193    #[test]
194    fn test_yaml_to_vrl() {
195        let yaml = r#"
196processors:
197  - vrl:
198      source: |
199        .name.a = .user_info.name
200        .name.b = .user_info.name
201        del(.user_info)
202        .timestamp = now()
203        .
204"#;
205        let y = yaml_rust::YamlLoader::load_from_str(yaml).unwrap();
206        let vrl_processor_yaml = y
207            .first()
208            .and_then(|x| x.as_hash())
209            .and_then(|x| x.get(&yaml_rust::Yaml::String("processors".to_string())))
210            .and_then(|x| x.as_vec())
211            .and_then(|x| x.first())
212            .and_then(|x| x.as_hash())
213            .and_then(|x| x.get(&yaml_rust::Yaml::String("vrl".to_string())))
214            .and_then(|x| x.as_hash())
215            .unwrap();
216
217        let vrl = VrlProcessor::try_from(vrl_processor_yaml);
218        assert!(vrl.is_ok());
219        let vrl = vrl.unwrap();
220
221        assert_eq!(vrl.source, ".name.a = .user_info.name\n.name.b = .user_info.name\ndel(.user_info)\n.timestamp = now()\n.\n");
222    }
223
224    #[test]
225    fn test_regex() {
226        let source = r#"
227.re = r'(?i)^Hello, World!$'
228del(.re)
229.re = r'(?i)^Hello, World!$'
230.
231"#;
232
233        let v = VrlProcessor::new(source.to_string());
234        assert!(v.is_err());
235    }
236}