pipeline/etl/processor/
select.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use ahash::{HashSet, HashSetExt};
16use snafu::OptionExt;
17use vrl::value::{KeyString, Value as VrlValue};
18
19use crate::error::{
20    Error, KeyMustBeStringSnafu, ProcessorUnsupportedValueSnafu, Result, ValueMustBeMapSnafu,
21};
22use crate::etl::field::Fields;
23use crate::etl::processor::{
24    yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME, TYPE_NAME,
25};
26use crate::Processor;
27
28pub(crate) const PROCESSOR_SELECT: &str = "select";
29const INCLUDE_KEY: &str = "include";
30const EXCLUDE_KEY: &str = "exclude";
31
32#[derive(Debug, Default)]
33pub enum SelectType {
34    #[default]
35    Include,
36    Exclude,
37}
38
39impl TryFrom<String> for SelectType {
40    type Error = Error;
41
42    fn try_from(value: String) -> std::result::Result<Self, Self::Error> {
43        match value.as_str() {
44            INCLUDE_KEY => Ok(SelectType::Include),
45            EXCLUDE_KEY => Ok(SelectType::Exclude),
46            _ => ProcessorUnsupportedValueSnafu {
47                processor: PROCESSOR_SELECT.to_string(),
48                val: format!("'{}', expect '{}' or '{}'", value, INCLUDE_KEY, EXCLUDE_KEY),
49            }
50            .fail(),
51        }
52    }
53}
54
55#[derive(Debug, Default)]
56pub struct SelectProcessor {
57    fields: Fields,
58    select_type: SelectType,
59}
60
61impl TryFrom<&yaml_rust::yaml::Hash> for SelectProcessor {
62    type Error = Error;
63
64    fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
65        let mut fields = Fields::default();
66        let mut select_type = SelectType::default();
67
68        for (k, v) in value.iter() {
69            let key = k
70                .as_str()
71                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
72            match key {
73                FIELD_NAME => {
74                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
75                }
76                FIELDS_NAME => {
77                    fields = yaml_new_fields(v, FIELDS_NAME)?;
78                }
79                TYPE_NAME => {
80                    select_type = SelectType::try_from(yaml_string(v, TYPE_NAME)?)?;
81                }
82                _ => {}
83            }
84        }
85
86        Ok(SelectProcessor {
87            fields,
88            select_type,
89        })
90    }
91}
92
93impl Processor for SelectProcessor {
94    fn kind(&self) -> &str {
95        PROCESSOR_SELECT
96    }
97
98    fn ignore_missing(&self) -> bool {
99        true
100    }
101
102    fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
103        let v_map = val.as_object_mut().context(ValueMustBeMapSnafu)?;
104
105        match self.select_type {
106            SelectType::Include => {
107                let mut include_key_set = HashSet::with_capacity(v_map.len());
108                for field in self.fields.iter() {
109                    // If the field has a target, move the value to the target
110                    let field_name = field.input_field();
111                    if let Some(target_name) = field.target_field() {
112                        if let Some(v) = v_map.remove(field_name) {
113                            v_map.insert(KeyString::from(target_name), v);
114                        }
115                        include_key_set.insert(target_name);
116                    } else {
117                        include_key_set.insert(field_name);
118                    }
119                }
120                v_map.retain(|k, _| include_key_set.contains(k.as_str()));
121            }
122            SelectType::Exclude => {
123                for field in self.fields.iter() {
124                    v_map.remove(field.input_field());
125                }
126            }
127        }
128
129        Ok(val)
130    }
131}
132
133#[cfg(test)]
134mod test {
135    use std::collections::BTreeMap;
136
137    use vrl::prelude::Bytes;
138    use vrl::value::{KeyString, Value as VrlValue};
139
140    use crate::etl::field::{Field, Fields};
141    use crate::etl::processor::select::{SelectProcessor, SelectType};
142    use crate::Processor;
143
144    #[test]
145    fn test_select() {
146        let processor = SelectProcessor {
147            fields: Fields::one(Field::new("hello", None)),
148            select_type: SelectType::Include,
149        };
150
151        let mut p = BTreeMap::new();
152        p.insert(
153            KeyString::from("hello"),
154            VrlValue::Bytes(Bytes::from("world".to_string())),
155        );
156        p.insert(
157            KeyString::from("hello2"),
158            VrlValue::Bytes(Bytes::from("world2".to_string())),
159        );
160
161        let result = processor.exec_mut(VrlValue::Object(p));
162        assert!(result.is_ok());
163        let mut result = result.unwrap();
164        let p = result.as_object_mut().unwrap();
165        assert_eq!(p.len(), 1);
166        assert_eq!(
167            p.get(&KeyString::from("hello")),
168            Some(&VrlValue::Bytes(Bytes::from("world".to_string())))
169        );
170    }
171
172    #[test]
173    fn test_select_with_target() {
174        let processor = SelectProcessor {
175            fields: Fields::one(Field::new("hello", Some("hello3".to_string()))),
176            select_type: SelectType::Include,
177        };
178
179        let mut p = BTreeMap::new();
180        p.insert(
181            KeyString::from("hello"),
182            VrlValue::Bytes(Bytes::from("world".to_string())),
183        );
184        p.insert(
185            KeyString::from("hello2"),
186            VrlValue::Bytes(Bytes::from("world2".to_string())),
187        );
188
189        let result = processor.exec_mut(VrlValue::Object(p));
190        assert!(result.is_ok());
191        let mut result = result.unwrap();
192        let p = result.as_object_mut().unwrap();
193        assert_eq!(p.len(), 1);
194        assert_eq!(
195            p.get(&KeyString::from("hello3")),
196            Some(&VrlValue::Bytes(Bytes::from("world".to_string())))
197        );
198    }
199
200    #[test]
201    fn test_select_with_exclude() {
202        let processor = SelectProcessor {
203            fields: Fields::one(Field::new("hello", None)),
204            select_type: SelectType::Exclude,
205        };
206
207        let mut p = BTreeMap::new();
208        p.insert(
209            KeyString::from("hello"),
210            VrlValue::Bytes(Bytes::from("world".to_string())),
211        );
212        p.insert(
213            KeyString::from("hello2"),
214            VrlValue::Bytes(Bytes::from("world2".to_string())),
215        );
216
217        let result = processor.exec_mut(VrlValue::Object(p));
218        assert!(result.is_ok());
219        let mut result = result.unwrap();
220        let p = result.as_object_mut().unwrap();
221        assert_eq!(p.len(), 1);
222        assert_eq!(p.get(&KeyString::from("hello")), None);
223        assert_eq!(
224            p.get(&KeyString::from("hello2")),
225            Some(&VrlValue::Bytes(Bytes::from("world2".to_string())))
226        );
227    }
228}