pipeline/etl/processor/
simple_extract.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use snafu::OptionExt as _;
16
17use crate::error::{Error, KeyMustBeStringSnafu, ProcessorMissingFieldSnafu, Result};
18use crate::etl::field::Fields;
19use crate::etl::processor::{
20    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
21    IGNORE_MISSING_NAME, KEY_NAME,
22};
23use crate::{PipelineMap, Processor, Value};
24
25pub(crate) const PROCESSOR_SIMPLE_EXTRACT: &str = "simple_extract";
26
27#[derive(Debug, Default)]
28pub struct SimpleExtractProcessor {
29    fields: Fields,
30    /// simple keys to extract nested JSON field
31    /// key `a.b` is saved as  ['a', 'b'], each key represents a level of the JSON tree
32    key: Vec<String>,
33    ignore_missing: bool,
34}
35
36impl TryFrom<&yaml_rust::yaml::Hash> for SimpleExtractProcessor {
37    type Error = Error;
38
39    fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
40        let mut fields = Fields::default();
41        let mut ignore_missing = false;
42        let mut keys = vec![];
43
44        for (k, v) in value.iter() {
45            let key = k
46                .as_str()
47                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
48            match key {
49                FIELD_NAME => {
50                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
51                }
52                FIELDS_NAME => {
53                    fields = yaml_new_fields(v, FIELDS_NAME)?;
54                }
55                IGNORE_MISSING_NAME => {
56                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
57                }
58                KEY_NAME => {
59                    let key_str = yaml_string(v, KEY_NAME)?;
60                    keys.extend(key_str.split(".").map(|s| s.to_string()));
61                }
62                _ => {}
63            }
64        }
65
66        let processor = SimpleExtractProcessor {
67            fields,
68            key: keys,
69            ignore_missing,
70        };
71
72        Ok(processor)
73    }
74}
75
76impl SimpleExtractProcessor {
77    fn process_field(&self, val: &Value) -> Result<Value> {
78        let mut current = val;
79        for key in self.key.iter() {
80            let Value::Map(map) = current else {
81                return Ok(Value::Null);
82            };
83            let Some(v) = map.get(key) else {
84                return Ok(Value::Null);
85            };
86            current = v;
87        }
88        Ok(current.clone())
89    }
90}
91
92impl Processor for SimpleExtractProcessor {
93    fn kind(&self) -> &str {
94        PROCESSOR_SIMPLE_EXTRACT
95    }
96
97    fn ignore_missing(&self) -> bool {
98        self.ignore_missing
99    }
100
101    fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
102        for field in self.fields.iter() {
103            let index = field.input_field();
104            match val.get(index) {
105                Some(v) => {
106                    let processed = self.process_field(v)?;
107                    let output_index = field.target_or_input_field();
108                    val.insert(output_index.to_string(), processed);
109                }
110                None => {
111                    if !self.ignore_missing {
112                        return ProcessorMissingFieldSnafu {
113                            processor: self.kind(),
114                            field: field.input_field(),
115                        }
116                        .fail();
117                    }
118                }
119            }
120        }
121        Ok(())
122    }
123}
124
125#[cfg(test)]
126mod test {
127
128    #[test]
129    fn test_simple_extract() {
130        use super::*;
131        use crate::{Map, Value};
132
133        let processor = SimpleExtractProcessor {
134            key: vec!["hello".to_string()],
135            ..Default::default()
136        };
137
138        let result = processor
139            .process_field(&Value::Map(Map::one(
140                "hello",
141                Value::String("world".to_string()),
142            )))
143            .unwrap();
144
145        assert_eq!(result, Value::String("world".to_string()));
146    }
147}