pipeline/etl/processor/
select.rs1use ahash::{HashSet, HashSetExt};
16use snafu::OptionExt;
17
18use crate::error::{Error, KeyMustBeStringSnafu, ProcessorUnsupportedValueSnafu, Result};
19use crate::etl::field::Fields;
20use crate::etl::processor::{
21 yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME, TYPE_NAME,
22};
23use crate::{PipelineMap, Processor};
24
25pub(crate) const PROCESSOR_SELECT: &str = "select";
26const INCLUDE_KEY: &str = "include";
27const EXCLUDE_KEY: &str = "exclude";
28
29#[derive(Debug, Default)]
30pub enum SelectType {
31 #[default]
32 Include,
33 Exclude,
34}
35
36impl TryFrom<String> for SelectType {
37 type Error = Error;
38
39 fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
40 match value.as_str() {
41 INCLUDE_KEY => Ok(SelectType::Include),
42 EXCLUDE_KEY => Ok(SelectType::Exclude),
43 _ => ProcessorUnsupportedValueSnafu {
44 processor: PROCESSOR_SELECT.to_string(),
45 val: format!("'{}', expect '{}' or '{}'", value, INCLUDE_KEY, EXCLUDE_KEY),
46 }
47 .fail(),
48 }
49 }
50}
51
52#[derive(Debug, Default)]
53pub struct SelectProcessor {
54 fields: Fields,
55 select_type: SelectType,
56}
57
58impl TryFrom<&yaml_rust::yaml::Hash> for SelectProcessor {
59 type Error = Error;
60
61 fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
62 let mut fields = Fields::default();
63 let mut select_type = SelectType::default();
64
65 for (k, v) in value.iter() {
66 let key = k
67 .as_str()
68 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
69 match key {
70 FIELD_NAME => {
71 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
72 }
73 FIELDS_NAME => {
74 fields = yaml_new_fields(v, FIELDS_NAME)?;
75 }
76 TYPE_NAME => {
77 select_type = SelectType::try_from(yaml_string(v, TYPE_NAME)?)?;
78 }
79 _ => {}
80 }
81 }
82
83 Ok(SelectProcessor {
84 fields,
85 select_type,
86 })
87 }
88}
89
90impl Processor for SelectProcessor {
91 fn kind(&self) -> &str {
92 PROCESSOR_SELECT
93 }
94
95 fn ignore_missing(&self) -> bool {
96 true
97 }
98
99 fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
100 match self.select_type {
101 SelectType::Include => {
102 let mut include_key_set = HashSet::with_capacity(val.len());
103 for field in self.fields.iter() {
104 let field_name = field.input_field();
106 if let Some(target_name) = field.target_field() {
107 if let Some(v) = val.remove(field_name) {
108 val.insert(target_name.to_string(), v);
109 }
110 include_key_set.insert(target_name);
111 } else {
112 include_key_set.insert(field_name);
113 }
114 }
115 val.retain(|k, _| include_key_set.contains(k.as_str()));
116 }
117 SelectType::Exclude => {
118 for field in self.fields.iter() {
119 val.remove(field.input_field());
120 }
121 }
122 }
123
124 Ok(())
125 }
126}
127
128#[cfg(test)]
129mod test {
130 use crate::etl::field::{Field, Fields};
131 use crate::etl::processor::select::{SelectProcessor, SelectType};
132 use crate::{PipelineMap, Processor, Value};
133
134 #[test]
135 fn test_select() {
136 let processor = SelectProcessor {
137 fields: Fields::one(Field::new("hello", None)),
138 select_type: SelectType::Include,
139 };
140
141 let mut p = PipelineMap::new();
142 p.insert("hello".to_string(), Value::String("world".to_string()));
143 p.insert("hello2".to_string(), Value::String("world2".to_string()));
144
145 let result = processor.exec_mut(&mut p);
146 assert!(result.is_ok());
147 assert_eq!(p.len(), 1);
148 assert_eq!(p.get("hello"), Some(&Value::String("world".to_string())));
149 }
150
151 #[test]
152 fn test_select_with_target() {
153 let processor = SelectProcessor {
154 fields: Fields::one(Field::new("hello", Some("hello3".to_string()))),
155 select_type: SelectType::Include,
156 };
157
158 let mut p = PipelineMap::new();
159 p.insert("hello".to_string(), Value::String("world".to_string()));
160 p.insert("hello2".to_string(), Value::String("world2".to_string()));
161
162 let result = processor.exec_mut(&mut p);
163 assert!(result.is_ok());
164 assert_eq!(p.len(), 1);
165 assert_eq!(p.get("hello3"), Some(&Value::String("world".to_string())));
166 }
167
168 #[test]
169 fn test_select_with_exclude() {
170 let processor = SelectProcessor {
171 fields: Fields::one(Field::new("hello", None)),
172 select_type: SelectType::Exclude,
173 };
174
175 let mut p = PipelineMap::new();
176 p.insert("hello".to_string(), Value::String("world".to_string()));
177 p.insert("hello2".to_string(), Value::String("world2".to_string()));
178
179 let result = processor.exec_mut(&mut p);
180 assert!(result.is_ok());
181 assert_eq!(p.len(), 1);
182 assert_eq!(p.get("hello"), None);
183 assert_eq!(p.get("hello2"), Some(&Value::String("world2".to_string())));
184 }
185}