pipeline/etl/processor/
select.rs1use ahash::{HashSet, HashSetExt};
16use snafu::OptionExt;
17use vrl::value::{KeyString, Value as VrlValue};
18
19use crate::error::{
20 Error, KeyMustBeStringSnafu, ProcessorUnsupportedValueSnafu, Result, ValueMustBeMapSnafu,
21};
22use crate::etl::field::Fields;
23use crate::etl::processor::{
24 yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME, TYPE_NAME,
25};
26use crate::Processor;
27
28pub(crate) const PROCESSOR_SELECT: &str = "select";
29const INCLUDE_KEY: &str = "include";
30const EXCLUDE_KEY: &str = "exclude";
31
32#[derive(Debug, Default)]
33pub enum SelectType {
34 #[default]
35 Include,
36 Exclude,
37}
38
39impl TryFrom<String> for SelectType {
40 type Error = Error;
41
42 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
43 match value.as_str() {
44 INCLUDE_KEY => Ok(SelectType::Include),
45 EXCLUDE_KEY => Ok(SelectType::Exclude),
46 _ => ProcessorUnsupportedValueSnafu {
47 processor: PROCESSOR_SELECT.to_string(),
48 val: format!("'{}', expect '{}' or '{}'", value, INCLUDE_KEY, EXCLUDE_KEY),
49 }
50 .fail(),
51 }
52 }
53}
54
55#[derive(Debug, Default)]
56pub struct SelectProcessor {
57 fields: Fields,
58 select_type: SelectType,
59}
60
61impl TryFrom<&yaml_rust::yaml::Hash> for SelectProcessor {
62 type Error = Error;
63
64 fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
65 let mut fields = Fields::default();
66 let mut select_type = SelectType::default();
67
68 for (k, v) in value.iter() {
69 let key = k
70 .as_str()
71 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
72 match key {
73 FIELD_NAME => {
74 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
75 }
76 FIELDS_NAME => {
77 fields = yaml_new_fields(v, FIELDS_NAME)?;
78 }
79 TYPE_NAME => {
80 select_type = SelectType::try_from(yaml_string(v, TYPE_NAME)?)?;
81 }
82 _ => {}
83 }
84 }
85
86 Ok(SelectProcessor {
87 fields,
88 select_type,
89 })
90 }
91}
92
93impl Processor for SelectProcessor {
94 fn kind(&self) -> &str {
95 PROCESSOR_SELECT
96 }
97
98 fn ignore_missing(&self) -> bool {
99 true
100 }
101
102 fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
103 let v_map = val.as_object_mut().context(ValueMustBeMapSnafu)?;
104
105 match self.select_type {
106 SelectType::Include => {
107 let mut include_key_set = HashSet::with_capacity(v_map.len());
108 for field in self.fields.iter() {
109 let field_name = field.input_field();
111 if let Some(target_name) = field.target_field() {
112 if let Some(v) = v_map.remove(field_name) {
113 v_map.insert(KeyString::from(target_name), v);
114 }
115 include_key_set.insert(target_name);
116 } else {
117 include_key_set.insert(field_name);
118 }
119 }
120 v_map.retain(|k, _| include_key_set.contains(k.as_str()));
121 }
122 SelectType::Exclude => {
123 for field in self.fields.iter() {
124 v_map.remove(field.input_field());
125 }
126 }
127 }
128
129 Ok(val)
130 }
131}
132
133#[cfg(test)]
134mod test {
135 use std::collections::BTreeMap;
136
137 use vrl::prelude::Bytes;
138 use vrl::value::{KeyString, Value as VrlValue};
139
140 use crate::etl::field::{Field, Fields};
141 use crate::etl::processor::select::{SelectProcessor, SelectType};
142 use crate::Processor;
143
144 #[test]
145 fn test_select() {
146 let processor = SelectProcessor {
147 fields: Fields::one(Field::new("hello", None)),
148 select_type: SelectType::Include,
149 };
150
151 let mut p = BTreeMap::new();
152 p.insert(
153 KeyString::from("hello"),
154 VrlValue::Bytes(Bytes::from("world".to_string())),
155 );
156 p.insert(
157 KeyString::from("hello2"),
158 VrlValue::Bytes(Bytes::from("world2".to_string())),
159 );
160
161 let result = processor.exec_mut(VrlValue::Object(p));
162 assert!(result.is_ok());
163 let mut result = result.unwrap();
164 let p = result.as_object_mut().unwrap();
165 assert_eq!(p.len(), 1);
166 assert_eq!(
167 p.get(&KeyString::from("hello")),
168 Some(&VrlValue::Bytes(Bytes::from("world".to_string())))
169 );
170 }
171
172 #[test]
173 fn test_select_with_target() {
174 let processor = SelectProcessor {
175 fields: Fields::one(Field::new("hello", Some("hello3".to_string()))),
176 select_type: SelectType::Include,
177 };
178
179 let mut p = BTreeMap::new();
180 p.insert(
181 KeyString::from("hello"),
182 VrlValue::Bytes(Bytes::from("world".to_string())),
183 );
184 p.insert(
185 KeyString::from("hello2"),
186 VrlValue::Bytes(Bytes::from("world2".to_string())),
187 );
188
189 let result = processor.exec_mut(VrlValue::Object(p));
190 assert!(result.is_ok());
191 let mut result = result.unwrap();
192 let p = result.as_object_mut().unwrap();
193 assert_eq!(p.len(), 1);
194 assert_eq!(
195 p.get(&KeyString::from("hello3")),
196 Some(&VrlValue::Bytes(Bytes::from("world".to_string())))
197 );
198 }
199
200 #[test]
201 fn test_select_with_exclude() {
202 let processor = SelectProcessor {
203 fields: Fields::one(Field::new("hello", None)),
204 select_type: SelectType::Exclude,
205 };
206
207 let mut p = BTreeMap::new();
208 p.insert(
209 KeyString::from("hello"),
210 VrlValue::Bytes(Bytes::from("world".to_string())),
211 );
212 p.insert(
213 KeyString::from("hello2"),
214 VrlValue::Bytes(Bytes::from("world2".to_string())),
215 );
216
217 let result = processor.exec_mut(VrlValue::Object(p));
218 assert!(result.is_ok());
219 let mut result = result.unwrap();
220 let p = result.as_object_mut().unwrap();
221 assert_eq!(p.len(), 1);
222 assert_eq!(p.get(&KeyString::from("hello")), None);
223 assert_eq!(
224 p.get(&KeyString::from("hello2")),
225 Some(&VrlValue::Bytes(Bytes::from("world2".to_string())))
226 );
227 }
228}