pipeline/etl/processor/
simple_extract.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use snafu::OptionExt as _;
16use vrl::value::{KeyString, Value as VrlValue};
17
18use crate::error::{
19    Error, KeyMustBeStringSnafu, ProcessorMissingFieldSnafu, Result, ValueMustBeMapSnafu,
20};
21use crate::etl::field::Fields;
22use crate::etl::processor::{
23    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
24    IGNORE_MISSING_NAME, KEY_NAME,
25};
26use crate::Processor;
27
28pub(crate) const PROCESSOR_SIMPLE_EXTRACT: &str = "simple_extract";
29
30#[derive(Debug, Default)]
31pub struct SimpleExtractProcessor {
32    fields: Fields,
33    /// simple keys to extract nested JSON field
34    /// key `a.b` is saved as  ['a', 'b'], each key represents a level of the JSON tree
35    key: Vec<String>,
36    ignore_missing: bool,
37}
38
39impl TryFrom<&yaml_rust::yaml::Hash> for SimpleExtractProcessor {
40    type Error = Error;
41
42    fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
43        let mut fields = Fields::default();
44        let mut ignore_missing = false;
45        let mut keys = vec![];
46
47        for (k, v) in value.iter() {
48            let key = k
49                .as_str()
50                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
51            match key {
52                FIELD_NAME => {
53                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
54                }
55                FIELDS_NAME => {
56                    fields = yaml_new_fields(v, FIELDS_NAME)?;
57                }
58                IGNORE_MISSING_NAME => {
59                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
60                }
61                KEY_NAME => {
62                    let key_str = yaml_string(v, KEY_NAME)?;
63                    keys.extend(key_str.split(".").map(|s| s.to_string()));
64                }
65                _ => {}
66            }
67        }
68
69        let processor = SimpleExtractProcessor {
70            fields,
71            key: keys,
72            ignore_missing,
73        };
74
75        Ok(processor)
76    }
77}
78
79impl SimpleExtractProcessor {
80    fn process_field(&self, val: &VrlValue) -> Result<VrlValue> {
81        let mut current = val;
82        for key in self.key.iter() {
83            let VrlValue::Object(map) = current else {
84                return Ok(VrlValue::Null);
85            };
86            let Some(v) = map.get(key.as_str()) else {
87                return Ok(VrlValue::Null);
88            };
89            current = v;
90        }
91        Ok(current.clone())
92    }
93}
94
95impl Processor for SimpleExtractProcessor {
96    fn kind(&self) -> &str {
97        PROCESSOR_SIMPLE_EXTRACT
98    }
99
100    fn ignore_missing(&self) -> bool {
101        self.ignore_missing
102    }
103
104    fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
105        for field in self.fields.iter() {
106            let index = field.input_field();
107            let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
108            match val.get(index) {
109                Some(v) => {
110                    let processed = self.process_field(v)?;
111                    let output_index = field.target_or_input_field();
112                    val.insert(KeyString::from(output_index), processed);
113                }
114                None => {
115                    if !self.ignore_missing {
116                        return ProcessorMissingFieldSnafu {
117                            processor: self.kind(),
118                            field: field.input_field(),
119                        }
120                        .fail();
121                    }
122                }
123            }
124        }
125        Ok(val)
126    }
127}
128
129#[cfg(test)]
130mod test {
131    use std::collections::BTreeMap;
132
133    use vrl::prelude::Bytes;
134
135    #[test]
136    fn test_simple_extract() {
137        use super::*;
138
139        let processor = SimpleExtractProcessor {
140            key: vec!["hello".to_string()],
141            ..Default::default()
142        };
143
144        let result = processor
145            .process_field(&VrlValue::Object(BTreeMap::from([(
146                KeyString::from("hello"),
147                VrlValue::Bytes(Bytes::from("world".to_string())),
148            )])))
149            .unwrap();
150
151        assert_eq!(result, VrlValue::Bytes(Bytes::from("world".to_string())));
152    }
153}