pipeline/etl/processor/
json_parse.rs1use snafu::{OptionExt as _, ResultExt};
16
17use crate::error::{
18 Error, FieldMustBeTypeSnafu, JsonParseSnafu, KeyMustBeStringSnafu, ProcessorMissingFieldSnafu,
19 ProcessorUnsupportedValueSnafu, Result,
20};
21use crate::etl::field::Fields;
22use crate::etl::processor::{
23 yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME,
24};
25use crate::{json_to_map, PipelineMap, Processor, Value};
26
27pub(crate) const PROCESSOR_JSON_PARSE: &str = "json_parse";
28
29#[derive(Debug, Default)]
30pub struct JsonParseProcessor {
31 fields: Fields,
32 ignore_missing: bool,
33}
34
35impl TryFrom<&yaml_rust::yaml::Hash> for JsonParseProcessor {
36 type Error = Error;
37
38 fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
39 let mut fields = Fields::default();
40 let mut ignore_missing = false;
41
42 for (k, v) in value.iter() {
43 let key = k
44 .as_str()
45 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
46 match key {
47 FIELD_NAME => {
48 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
49 }
50 FIELDS_NAME => {
51 fields = yaml_new_fields(v, FIELDS_NAME)?;
52 }
53 IGNORE_MISSING_NAME => {
54 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
55 }
56 _ => {}
57 }
58 }
59
60 let processor = JsonParseProcessor {
61 fields,
62 ignore_missing,
63 };
64
65 Ok(processor)
66 }
67}
68
69impl JsonParseProcessor {
70 fn process_field(&self, val: &Value) -> Result<Value> {
71 let Some(json_str) = val.as_str() else {
72 return FieldMustBeTypeSnafu {
73 field: val.to_str_type(),
74 ty: "string",
75 }
76 .fail();
77 };
78 let parsed: serde_json::Value = serde_json::from_str(json_str).context(JsonParseSnafu)?;
79 match parsed {
80 serde_json::Value::Object(_) => Ok(Value::Map(json_to_map(parsed)?.into())),
81 serde_json::Value::Array(arr) => Ok(Value::Array(arr.try_into()?)),
82 _ => ProcessorUnsupportedValueSnafu {
83 processor: self.kind(),
84 val: val.to_str_type(),
85 }
86 .fail(),
87 }
88 }
89}
90
91impl Processor for JsonParseProcessor {
92 fn kind(&self) -> &str {
93 PROCESSOR_JSON_PARSE
94 }
95
96 fn ignore_missing(&self) -> bool {
97 self.ignore_missing
98 }
99
100 fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
101 for field in self.fields.iter() {
102 let index = field.input_field();
103 match val.get(index) {
104 Some(v) => {
105 let processed = self.process_field(v)?;
106 let output_index = field.target_or_input_field();
107 val.insert(output_index.to_string(), processed);
108 }
109 None => {
110 if !self.ignore_missing {
111 return ProcessorMissingFieldSnafu {
112 processor: self.kind(),
113 field: field.input_field(),
114 }
115 .fail();
116 }
117 }
118 }
119 }
120 Ok(val)
121 }
122}
123
124#[cfg(test)]
125mod test {
126
127 #[test]
128 fn test_json_parse() {
129 use super::*;
130 use crate::Value;
131
132 let processor = JsonParseProcessor {
133 ..Default::default()
134 };
135
136 let result = processor
137 .process_field(&Value::String(r#"{"hello": "world"}"#.to_string()))
138 .unwrap();
139
140 let expected = Value::Map(crate::Map::one(
141 "hello".to_string(),
142 Value::String("world".to_string()),
143 ));
144
145 assert_eq!(result, expected);
146 }
147}