pipeline/etl/processor/
select.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use ahash::{HashSet, HashSetExt};
16use snafu::OptionExt;
17
18use crate::error::{Error, KeyMustBeStringSnafu, ProcessorUnsupportedValueSnafu, Result};
19use crate::etl::field::Fields;
20use crate::etl::processor::{
21    yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME, TYPE_NAME,
22};
23use crate::{PipelineMap, Processor};
24
25pub(crate) const PROCESSOR_SELECT: &str = "select";
26const INCLUDE_KEY: &str = "include";
27const EXCLUDE_KEY: &str = "exclude";
28
29#[derive(Debug, Default)]
30pub enum SelectType {
31    #[default]
32    Include,
33    Exclude,
34}
35
36impl TryFrom<String> for SelectType {
37    type Error = Error;
38
39    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
40        match value.as_str() {
41            INCLUDE_KEY => Ok(SelectType::Include),
42            EXCLUDE_KEY => Ok(SelectType::Exclude),
43            _ => ProcessorUnsupportedValueSnafu {
44                processor: PROCESSOR_SELECT.to_string(),
45                val: format!("'{}', expect '{}' or '{}'", value, INCLUDE_KEY, EXCLUDE_KEY),
46            }
47            .fail(),
48        }
49    }
50}
51
52#[derive(Debug, Default)]
53pub struct SelectProcessor {
54    fields: Fields,
55    select_type: SelectType,
56}
57
58impl TryFrom<&yaml_rust::yaml::Hash> for SelectProcessor {
59    type Error = Error;
60
61    fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
62        let mut fields = Fields::default();
63        let mut select_type = SelectType::default();
64
65        for (k, v) in value.iter() {
66            let key = k
67                .as_str()
68                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
69            match key {
70                FIELD_NAME => {
71                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
72                }
73                FIELDS_NAME => {
74                    fields = yaml_new_fields(v, FIELDS_NAME)?;
75                }
76                TYPE_NAME => {
77                    select_type = SelectType::try_from(yaml_string(v, TYPE_NAME)?)?;
78                }
79                _ => {}
80            }
81        }
82
83        Ok(SelectProcessor {
84            fields,
85            select_type,
86        })
87    }
88}
89
90impl Processor for SelectProcessor {
91    fn kind(&self) -> &str {
92        PROCESSOR_SELECT
93    }
94
95    fn ignore_missing(&self) -> bool {
96        true
97    }
98
99    fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
100        match self.select_type {
101            SelectType::Include => {
102                let mut include_key_set = HashSet::with_capacity(val.len());
103                for field in self.fields.iter() {
104                    // If the field has a target, move the value to the target
105                    let field_name = field.input_field();
106                    if let Some(target_name) = field.target_field() {
107                        if let Some(v) = val.remove(field_name) {
108                            val.insert(target_name.to_string(), v);
109                        }
110                        include_key_set.insert(target_name);
111                    } else {
112                        include_key_set.insert(field_name);
113                    }
114                }
115                val.retain(|k, _| include_key_set.contains(k.as_str()));
116            }
117            SelectType::Exclude => {
118                for field in self.fields.iter() {
119                    val.remove(field.input_field());
120                }
121            }
122        }
123
124        Ok(())
125    }
126}
127
128#[cfg(test)]
129mod test {
130    use crate::etl::field::{Field, Fields};
131    use crate::etl::processor::select::{SelectProcessor, SelectType};
132    use crate::{PipelineMap, Processor, Value};
133
134    #[test]
135    fn test_select() {
136        let processor = SelectProcessor {
137            fields: Fields::one(Field::new("hello", None)),
138            select_type: SelectType::Include,
139        };
140
141        let mut p = PipelineMap::new();
142        p.insert("hello".to_string(), Value::String("world".to_string()));
143        p.insert("hello2".to_string(), Value::String("world2".to_string()));
144
145        let result = processor.exec_mut(&mut p);
146        assert!(result.is_ok());
147        assert_eq!(p.len(), 1);
148        assert_eq!(p.get("hello"), Some(&Value::String("world".to_string())));
149    }
150
151    #[test]
152    fn test_select_with_target() {
153        let processor = SelectProcessor {
154            fields: Fields::one(Field::new("hello", Some("hello3".to_string()))),
155            select_type: SelectType::Include,
156        };
157
158        let mut p = PipelineMap::new();
159        p.insert("hello".to_string(), Value::String("world".to_string()));
160        p.insert("hello2".to_string(), Value::String("world2".to_string()));
161
162        let result = processor.exec_mut(&mut p);
163        assert!(result.is_ok());
164        assert_eq!(p.len(), 1);
165        assert_eq!(p.get("hello3"), Some(&Value::String("world".to_string())));
166    }
167
168    #[test]
169    fn test_select_with_exclude() {
170        let processor = SelectProcessor {
171            fields: Fields::one(Field::new("hello", None)),
172            select_type: SelectType::Exclude,
173        };
174
175        let mut p = PipelineMap::new();
176        p.insert("hello".to_string(), Value::String("world".to_string()));
177        p.insert("hello2".to_string(), Value::String("world2".to_string()));
178
179        let result = processor.exec_mut(&mut p);
180        assert!(result.is_ok());
181        assert_eq!(p.len(), 1);
182        assert_eq!(p.get("hello"), None);
183        assert_eq!(p.get("hello2"), Some(&Value::String("world2".to_string())));
184    }
185}