pipeline/etl/processor/
json_parse.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use snafu::{OptionExt as _, ResultExt};
16use vrl::value::{KeyString, Value as VrlValue};
17
18use crate::error::{
19    Error, FieldMustBeTypeSnafu, JsonParseSnafu, KeyMustBeStringSnafu, ProcessorMissingFieldSnafu,
20    ProcessorUnsupportedValueSnafu, Result, ValueMustBeMapSnafu,
21};
22use crate::etl::field::Fields;
23use crate::etl::processor::{
24    yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME,
25};
26use crate::Processor;
27
28pub(crate) const PROCESSOR_JSON_PARSE: &str = "json_parse";
29
30#[derive(Debug, Default)]
31pub struct JsonParseProcessor {
32    fields: Fields,
33    ignore_missing: bool,
34}
35
36impl TryFrom<&yaml_rust::yaml::Hash> for JsonParseProcessor {
37    type Error = Error;
38
39    fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
40        let mut fields = Fields::default();
41        let mut ignore_missing = false;
42
43        for (k, v) in value.iter() {
44            let key = k
45                .as_str()
46                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
47            match key {
48                FIELD_NAME => {
49                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
50                }
51                FIELDS_NAME => {
52                    fields = yaml_new_fields(v, FIELDS_NAME)?;
53                }
54                IGNORE_MISSING_NAME => {
55                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
56                }
57                _ => {}
58            }
59        }
60
61        let processor = JsonParseProcessor {
62            fields,
63            ignore_missing,
64        };
65
66        Ok(processor)
67    }
68}
69
70impl JsonParseProcessor {
71    fn process_field(&self, val: &VrlValue) -> Result<VrlValue> {
72        let Some(json_str) = val.as_str() else {
73            return FieldMustBeTypeSnafu {
74                field: val.to_string(),
75                ty: "string",
76            }
77            .fail();
78        };
79        let parsed: VrlValue = serde_json::from_str(&json_str).context(JsonParseSnafu)?;
80        match parsed {
81            VrlValue::Object(_) => Ok(parsed),
82            VrlValue::Array(_) => Ok(parsed),
83            _ => ProcessorUnsupportedValueSnafu {
84                processor: self.kind(),
85                val: val.to_string(),
86            }
87            .fail(),
88        }
89    }
90}
91
92impl Processor for JsonParseProcessor {
93    fn kind(&self) -> &str {
94        PROCESSOR_JSON_PARSE
95    }
96
97    fn ignore_missing(&self) -> bool {
98        self.ignore_missing
99    }
100
101    fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
102        for field in self.fields.iter() {
103            let index = field.input_field();
104            let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
105            match val.get(index) {
106                Some(v) => {
107                    let processed = self.process_field(v)?;
108                    let output_index = field.target_or_input_field();
109                    val.insert(KeyString::from(output_index.to_string()), processed);
110                }
111                None => {
112                    if !self.ignore_missing {
113                        return ProcessorMissingFieldSnafu {
114                            processor: self.kind(),
115                            field: field.input_field(),
116                        }
117                        .fail();
118                    }
119                }
120            }
121        }
122        Ok(val)
123    }
124}
125
126#[cfg(test)]
127mod test {
128    use std::collections::BTreeMap;
129
130    use vrl::prelude::Bytes;
131    use vrl::value::{KeyString, Value as VrlValue};
132
133    use crate::etl::processor::json_parse::JsonParseProcessor;
134
135    #[test]
136    fn test_json_parse() {
137        let processor = JsonParseProcessor {
138            ..Default::default()
139        };
140
141        let result = processor
142            .process_field(&VrlValue::Bytes(Bytes::from(r#"{"hello": "world"}"#)))
143            .unwrap();
144
145        let expected = VrlValue::Object(BTreeMap::from([(
146            KeyString::from("hello"),
147            VrlValue::Bytes(Bytes::from("world")),
148        )]));
149
150        assert_eq!(result, expected);
151    }
152}