pipeline/etl/processor/
json_parse.rs1use snafu::{OptionExt as _, ResultExt};
16use vrl::value::{KeyString, Value as VrlValue};
17
18use crate::error::{
19 Error, FieldMustBeTypeSnafu, JsonParseSnafu, KeyMustBeStringSnafu, ProcessorMissingFieldSnafu,
20 ProcessorUnsupportedValueSnafu, Result, ValueMustBeMapSnafu,
21};
22use crate::etl::field::Fields;
23use crate::etl::processor::{
24 yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME,
25};
26use crate::Processor;
27
28pub(crate) const PROCESSOR_JSON_PARSE: &str = "json_parse";
29
30#[derive(Debug, Default)]
31pub struct JsonParseProcessor {
32 fields: Fields,
33 ignore_missing: bool,
34}
35
36impl TryFrom<&yaml_rust::yaml::Hash> for JsonParseProcessor {
37 type Error = Error;
38
39 fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
40 let mut fields = Fields::default();
41 let mut ignore_missing = false;
42
43 for (k, v) in value.iter() {
44 let key = k
45 .as_str()
46 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
47 match key {
48 FIELD_NAME => {
49 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
50 }
51 FIELDS_NAME => {
52 fields = yaml_new_fields(v, FIELDS_NAME)?;
53 }
54 IGNORE_MISSING_NAME => {
55 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
56 }
57 _ => {}
58 }
59 }
60
61 let processor = JsonParseProcessor {
62 fields,
63 ignore_missing,
64 };
65
66 Ok(processor)
67 }
68}
69
70impl JsonParseProcessor {
71 fn process_field(&self, val: &VrlValue) -> Result<VrlValue> {
72 let Some(json_str) = val.as_str() else {
73 return FieldMustBeTypeSnafu {
74 field: val.to_string(),
75 ty: "string",
76 }
77 .fail();
78 };
79 let parsed: VrlValue = serde_json::from_str(&json_str).context(JsonParseSnafu)?;
80 match parsed {
81 VrlValue::Object(_) => Ok(parsed),
82 VrlValue::Array(_) => Ok(parsed),
83 _ => ProcessorUnsupportedValueSnafu {
84 processor: self.kind(),
85 val: val.to_string(),
86 }
87 .fail(),
88 }
89 }
90}
91
92impl Processor for JsonParseProcessor {
93 fn kind(&self) -> &str {
94 PROCESSOR_JSON_PARSE
95 }
96
97 fn ignore_missing(&self) -> bool {
98 self.ignore_missing
99 }
100
101 fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
102 for field in self.fields.iter() {
103 let index = field.input_field();
104 let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
105 match val.get(index) {
106 Some(v) => {
107 let processed = self.process_field(v)?;
108 let output_index = field.target_or_input_field();
109 val.insert(KeyString::from(output_index.to_string()), processed);
110 }
111 None => {
112 if !self.ignore_missing {
113 return ProcessorMissingFieldSnafu {
114 processor: self.kind(),
115 field: field.input_field(),
116 }
117 .fail();
118 }
119 }
120 }
121 }
122 Ok(val)
123 }
124}
125
126#[cfg(test)]
127mod test {
128 use std::collections::BTreeMap;
129
130 use vrl::prelude::Bytes;
131 use vrl::value::{KeyString, Value as VrlValue};
132
133 use crate::etl::processor::json_parse::JsonParseProcessor;
134
135 #[test]
136 fn test_json_parse() {
137 let processor = JsonParseProcessor {
138 ..Default::default()
139 };
140
141 let result = processor
142 .process_field(&VrlValue::Bytes(Bytes::from(r#"{"hello": "world"}"#)))
143 .unwrap();
144
145 let expected = VrlValue::Object(BTreeMap::from([(
146 KeyString::from("hello"),
147 VrlValue::Bytes(Bytes::from("world")),
148 )]));
149
150 assert_eq!(result, expected);
151 }
152}