pipeline/etl/processor/
select.rs1use ahash::{HashSet, HashSetExt};
16use snafu::OptionExt;
17
18use crate::error::{Error, KeyMustBeStringSnafu, ProcessorUnsupportedValueSnafu, Result};
19use crate::etl::field::Fields;
20use crate::etl::processor::{
21 yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME, TYPE_NAME,
22};
23use crate::{PipelineMap, Processor};
24
25pub(crate) const PROCESSOR_SELECT: &str = "select";
26const INCLUDE_KEY: &str = "include";
27const EXCLUDE_KEY: &str = "exclude";
28
29#[derive(Debug, Default)]
30pub enum SelectType {
31 #[default]
32 Include,
33 Exclude,
34}
35
36impl TryFrom<String> for SelectType {
37 type Error = Error;
38
39 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
40 match value.as_str() {
41 INCLUDE_KEY => Ok(SelectType::Include),
42 EXCLUDE_KEY => Ok(SelectType::Exclude),
43 _ => ProcessorUnsupportedValueSnafu {
44 processor: PROCESSOR_SELECT.to_string(),
45 val: format!("'{}', expect '{}' or '{}'", value, INCLUDE_KEY, EXCLUDE_KEY),
46 }
47 .fail(),
48 }
49 }
50}
51
52#[derive(Debug, Default)]
53pub struct SelectProcessor {
54 fields: Fields,
55 select_type: SelectType,
56}
57
58impl TryFrom<&yaml_rust::yaml::Hash> for SelectProcessor {
59 type Error = Error;
60
61 fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
62 let mut fields = Fields::default();
63 let mut select_type = SelectType::default();
64
65 for (k, v) in value.iter() {
66 let key = k
67 .as_str()
68 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
69 match key {
70 FIELD_NAME => {
71 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
72 }
73 FIELDS_NAME => {
74 fields = yaml_new_fields(v, FIELDS_NAME)?;
75 }
76 TYPE_NAME => {
77 select_type = SelectType::try_from(yaml_string(v, TYPE_NAME)?)?;
78 }
79 _ => {}
80 }
81 }
82
83 Ok(SelectProcessor {
84 fields,
85 select_type,
86 })
87 }
88}
89
90impl Processor for SelectProcessor {
91 fn kind(&self) -> &str {
92 PROCESSOR_SELECT
93 }
94
95 fn ignore_missing(&self) -> bool {
96 true
97 }
98
99 fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
100 match self.select_type {
101 SelectType::Include => {
102 let mut include_key_set = HashSet::with_capacity(val.len());
103 for field in self.fields.iter() {
104 let field_name = field.input_field();
106 if let Some(target_name) = field.target_field() {
107 if let Some(v) = val.remove(field_name) {
108 val.insert(target_name.to_string(), v);
109 }
110 include_key_set.insert(target_name);
111 } else {
112 include_key_set.insert(field_name);
113 }
114 }
115 val.retain(|k, _| include_key_set.contains(k.as_str()));
116 }
117 SelectType::Exclude => {
118 for field in self.fields.iter() {
119 val.remove(field.input_field());
120 }
121 }
122 }
123
124 Ok(val)
125 }
126}
127
128#[cfg(test)]
129mod test {
130 use crate::etl::field::{Field, Fields};
131 use crate::etl::processor::select::{SelectProcessor, SelectType};
132 use crate::{PipelineMap, Processor, Value};
133
134 #[test]
135 fn test_select() {
136 let processor = SelectProcessor {
137 fields: Fields::one(Field::new("hello", None)),
138 select_type: SelectType::Include,
139 };
140
141 let mut p = PipelineMap::new();
142 p.insert("hello".to_string(), Value::String("world".to_string()));
143 p.insert("hello2".to_string(), Value::String("world2".to_string()));
144
145 let result = processor.exec_mut(p);
146 assert!(result.is_ok());
147 let p = result.unwrap();
148 assert_eq!(p.len(), 1);
149 assert_eq!(p.get("hello"), Some(&Value::String("world".to_string())));
150 }
151
152 #[test]
153 fn test_select_with_target() {
154 let processor = SelectProcessor {
155 fields: Fields::one(Field::new("hello", Some("hello3".to_string()))),
156 select_type: SelectType::Include,
157 };
158
159 let mut p = PipelineMap::new();
160 p.insert("hello".to_string(), Value::String("world".to_string()));
161 p.insert("hello2".to_string(), Value::String("world2".to_string()));
162
163 let result = processor.exec_mut(p);
164 assert!(result.is_ok());
165 let p = result.unwrap();
166 assert_eq!(p.len(), 1);
167 assert_eq!(p.get("hello3"), Some(&Value::String("world".to_string())));
168 }
169
170 #[test]
171 fn test_select_with_exclude() {
172 let processor = SelectProcessor {
173 fields: Fields::one(Field::new("hello", None)),
174 select_type: SelectType::Exclude,
175 };
176
177 let mut p = PipelineMap::new();
178 p.insert("hello".to_string(), Value::String("world".to_string()));
179 p.insert("hello2".to_string(), Value::String("world2".to_string()));
180
181 let result = processor.exec_mut(p);
182 assert!(result.is_ok());
183 let p = result.unwrap();
184 assert_eq!(p.len(), 1);
185 assert_eq!(p.get("hello"), None);
186 assert_eq!(p.get("hello2"), Some(&Value::String("world2".to_string())));
187 }
188}