pipeline/etl/processor/
select.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use ahash::{HashSet, HashSetExt};
16use snafu::OptionExt;
17
18use crate::error::{Error, KeyMustBeStringSnafu, ProcessorUnsupportedValueSnafu, Result};
19use crate::etl::field::Fields;
20use crate::etl::processor::{
21    yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME, TYPE_NAME,
22};
23use crate::{PipelineMap, Processor};
24
25pub(crate) const PROCESSOR_SELECT: &str = "select";
26const INCLUDE_KEY: &str = "include";
27const EXCLUDE_KEY: &str = "exclude";
28
29#[derive(Debug, Default)]
30pub enum SelectType {
31    #[default]
32    Include,
33    Exclude,
34}
35
36impl TryFrom<String> for SelectType {
37    type Error = Error;
38
39    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
40        match value.as_str() {
41            INCLUDE_KEY => Ok(SelectType::Include),
42            EXCLUDE_KEY => Ok(SelectType::Exclude),
43            _ => ProcessorUnsupportedValueSnafu {
44                processor: PROCESSOR_SELECT.to_string(),
45                val: format!("'{}', expect '{}' or '{}'", value, INCLUDE_KEY, EXCLUDE_KEY),
46            }
47            .fail(),
48        }
49    }
50}
51
52#[derive(Debug, Default)]
53pub struct SelectProcessor {
54    fields: Fields,
55    select_type: SelectType,
56}
57
58impl TryFrom<&yaml_rust::yaml::Hash> for SelectProcessor {
59    type Error = Error;
60
61    fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
62        let mut fields = Fields::default();
63        let mut select_type = SelectType::default();
64
65        for (k, v) in value.iter() {
66            let key = k
67                .as_str()
68                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
69            match key {
70                FIELD_NAME => {
71                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
72                }
73                FIELDS_NAME => {
74                    fields = yaml_new_fields(v, FIELDS_NAME)?;
75                }
76                TYPE_NAME => {
77                    select_type = SelectType::try_from(yaml_string(v, TYPE_NAME)?)?;
78                }
79                _ => {}
80            }
81        }
82
83        Ok(SelectProcessor {
84            fields,
85            select_type,
86        })
87    }
88}
89
90impl Processor for SelectProcessor {
91    fn kind(&self) -> &str {
92        PROCESSOR_SELECT
93    }
94
95    fn ignore_missing(&self) -> bool {
96        true
97    }
98
99    fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
100        match self.select_type {
101            SelectType::Include => {
102                let mut include_key_set = HashSet::with_capacity(val.len());
103                for field in self.fields.iter() {
104                    // If the field has a target, move the value to the target
105                    let field_name = field.input_field();
106                    if let Some(target_name) = field.target_field() {
107                        if let Some(v) = val.remove(field_name) {
108                            val.insert(target_name.to_string(), v);
109                        }
110                        include_key_set.insert(target_name);
111                    } else {
112                        include_key_set.insert(field_name);
113                    }
114                }
115                val.retain(|k, _| include_key_set.contains(k.as_str()));
116            }
117            SelectType::Exclude => {
118                for field in self.fields.iter() {
119                    val.remove(field.input_field());
120                }
121            }
122        }
123
124        Ok(val)
125    }
126}
127
128#[cfg(test)]
129mod test {
130    use crate::etl::field::{Field, Fields};
131    use crate::etl::processor::select::{SelectProcessor, SelectType};
132    use crate::{PipelineMap, Processor, Value};
133
134    #[test]
135    fn test_select() {
136        let processor = SelectProcessor {
137            fields: Fields::one(Field::new("hello", None)),
138            select_type: SelectType::Include,
139        };
140
141        let mut p = PipelineMap::new();
142        p.insert("hello".to_string(), Value::String("world".to_string()));
143        p.insert("hello2".to_string(), Value::String("world2".to_string()));
144
145        let result = processor.exec_mut(p);
146        assert!(result.is_ok());
147        let p = result.unwrap();
148        assert_eq!(p.len(), 1);
149        assert_eq!(p.get("hello"), Some(&Value::String("world".to_string())));
150    }
151
152    #[test]
153    fn test_select_with_target() {
154        let processor = SelectProcessor {
155            fields: Fields::one(Field::new("hello", Some("hello3".to_string()))),
156            select_type: SelectType::Include,
157        };
158
159        let mut p = PipelineMap::new();
160        p.insert("hello".to_string(), Value::String("world".to_string()));
161        p.insert("hello2".to_string(), Value::String("world2".to_string()));
162
163        let result = processor.exec_mut(p);
164        assert!(result.is_ok());
165        let p = result.unwrap();
166        assert_eq!(p.len(), 1);
167        assert_eq!(p.get("hello3"), Some(&Value::String("world".to_string())));
168    }
169
170    #[test]
171    fn test_select_with_exclude() {
172        let processor = SelectProcessor {
173            fields: Fields::one(Field::new("hello", None)),
174            select_type: SelectType::Exclude,
175        };
176
177        let mut p = PipelineMap::new();
178        p.insert("hello".to_string(), Value::String("world".to_string()));
179        p.insert("hello2".to_string(), Value::String("world2".to_string()));
180
181        let result = processor.exec_mut(p);
182        assert!(result.is_ok());
183        let p = result.unwrap();
184        assert_eq!(p.len(), 1);
185        assert_eq!(p.get("hello"), None);
186        assert_eq!(p.get("hello2"), Some(&Value::String("world2".to_string())));
187    }
188}