pipeline/etl/processor/
simple_extract.rs1use snafu::OptionExt as _;
16use vrl::value::{KeyString, Value as VrlValue};
17
18use crate::error::{
19 Error, KeyMustBeStringSnafu, ProcessorMissingFieldSnafu, Result, ValueMustBeMapSnafu,
20};
21use crate::etl::field::Fields;
22use crate::etl::processor::{
23 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
24 IGNORE_MISSING_NAME, KEY_NAME,
25};
26use crate::Processor;
27
28pub(crate) const PROCESSOR_SIMPLE_EXTRACT: &str = "simple_extract";
29
30#[derive(Debug, Default)]
31pub struct SimpleExtractProcessor {
32 fields: Fields,
33 key: Vec<String>,
36 ignore_missing: bool,
37}
38
39impl TryFrom<&yaml_rust::yaml::Hash> for SimpleExtractProcessor {
40 type Error = Error;
41
42 fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
43 let mut fields = Fields::default();
44 let mut ignore_missing = false;
45 let mut keys = vec![];
46
47 for (k, v) in value.iter() {
48 let key = k
49 .as_str()
50 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
51 match key {
52 FIELD_NAME => {
53 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
54 }
55 FIELDS_NAME => {
56 fields = yaml_new_fields(v, FIELDS_NAME)?;
57 }
58 IGNORE_MISSING_NAME => {
59 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
60 }
61 KEY_NAME => {
62 let key_str = yaml_string(v, KEY_NAME)?;
63 keys.extend(key_str.split(".").map(|s| s.to_string()));
64 }
65 _ => {}
66 }
67 }
68
69 let processor = SimpleExtractProcessor {
70 fields,
71 key: keys,
72 ignore_missing,
73 };
74
75 Ok(processor)
76 }
77}
78
79impl SimpleExtractProcessor {
80 fn process_field(&self, val: &VrlValue) -> Result<VrlValue> {
81 let mut current = val;
82 for key in self.key.iter() {
83 let VrlValue::Object(map) = current else {
84 return Ok(VrlValue::Null);
85 };
86 let Some(v) = map.get(key.as_str()) else {
87 return Ok(VrlValue::Null);
88 };
89 current = v;
90 }
91 Ok(current.clone())
92 }
93}
94
95impl Processor for SimpleExtractProcessor {
96 fn kind(&self) -> &str {
97 PROCESSOR_SIMPLE_EXTRACT
98 }
99
100 fn ignore_missing(&self) -> bool {
101 self.ignore_missing
102 }
103
104 fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
105 for field in self.fields.iter() {
106 let index = field.input_field();
107 let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
108 match val.get(index) {
109 Some(v) => {
110 let processed = self.process_field(v)?;
111 let output_index = field.target_or_input_field();
112 val.insert(KeyString::from(output_index), processed);
113 }
114 None => {
115 if !self.ignore_missing {
116 return ProcessorMissingFieldSnafu {
117 processor: self.kind(),
118 field: field.input_field(),
119 }
120 .fail();
121 }
122 }
123 }
124 }
125 Ok(val)
126 }
127}
128
129#[cfg(test)]
130mod test {
131 use std::collections::BTreeMap;
132
133 use vrl::prelude::Bytes;
134
135 #[test]
136 fn test_simple_extract() {
137 use super::*;
138
139 let processor = SimpleExtractProcessor {
140 key: vec!["hello".to_string()],
141 ..Default::default()
142 };
143
144 let result = processor
145 .process_field(&VrlValue::Object(BTreeMap::from([(
146 KeyString::from("hello"),
147 VrlValue::Bytes(Bytes::from("world".to_string())),
148 )])))
149 .unwrap();
150
151 assert_eq!(result, VrlValue::Bytes(Bytes::from("world".to_string())));
152 }
153}