pipeline/etl/processor/
json_path.rs1use jsonpath_rust::JsonPath;
16use snafu::{OptionExt, ResultExt};
17use vrl::value::{KeyString, Value as VrlValue};
18
19use crate::error::{
20 Error, JsonParseSnafu, JsonPathParseResultIndexSnafu, JsonPathParseSnafu, KeyMustBeStringSnafu,
21 ProcessorMissingFieldSnafu, Result, ValueMustBeMapSnafu,
22};
23use crate::etl::field::Fields;
24use crate::etl::processor::{
25 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
26 IGNORE_MISSING_NAME, JSON_PATH_NAME, JSON_PATH_RESULT_INDEX_NAME,
27};
28
29pub(crate) const PROCESSOR_JSON_PATH: &str = "json_path";
30
31impl TryFrom<&yaml_rust::yaml::Hash> for JsonPathProcessor {
32 type Error = Error;
33
34 fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
35 let mut fields = Fields::default();
36 let mut ignore_missing = false;
37 let mut json_path = None;
38 let mut result_idex = None;
39
40 for (k, v) in value.iter() {
41 let key = k
42 .as_str()
43 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
44 match key {
45 FIELD_NAME => {
46 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
47 }
48 FIELDS_NAME => {
49 fields = yaml_new_fields(v, FIELDS_NAME)?;
50 }
51
52 IGNORE_MISSING_NAME => {
53 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
54 }
55 JSON_PATH_RESULT_INDEX_NAME => {
56 result_idex = Some(v.as_i64().context(JsonPathParseResultIndexSnafu)? as usize);
57 }
58
59 JSON_PATH_NAME => {
60 let json_path_str = yaml_string(v, JSON_PATH_NAME)?;
61 json_path = Some(
62 JsonPath::try_from(json_path_str.as_str()).context(JsonPathParseSnafu)?,
63 );
64 }
65
66 _ => {}
67 }
68 }
69
70 let processor = JsonPathProcessor {
71 fields,
72 json_path: json_path.context(ProcessorMissingFieldSnafu {
73 processor: PROCESSOR_JSON_PATH,
74 field: JSON_PATH_NAME,
75 })?,
76 ignore_missing,
77 result_index: result_idex,
78 };
79
80 Ok(processor)
81 }
82}
83
84#[derive(Debug)]
85pub struct JsonPathProcessor {
86 fields: Fields,
87 json_path: JsonPath<serde_json::Value>,
88 ignore_missing: bool,
89 result_index: Option<usize>,
90}
91
92impl Default for JsonPathProcessor {
93 fn default() -> Self {
94 JsonPathProcessor {
95 fields: Fields::default(),
96 json_path: JsonPath::try_from("$").unwrap(),
97 ignore_missing: false,
98 result_index: None,
99 }
100 }
101}
102
103impl JsonPathProcessor {
104 fn process_field(&self, val: &VrlValue) -> Result<VrlValue> {
105 let v = serde_json::to_value(val).context(JsonParseSnafu)?;
106 let p = self.json_path.find(&v);
107 match p {
108 serde_json::Value::Array(arr) => {
109 if let Some(index) = self.result_index {
110 Ok(arr
111 .get(index)
112 .cloned()
113 .map(|v| v.into())
114 .unwrap_or(VrlValue::Null))
115 } else {
116 Ok(VrlValue::Array(arr.into_iter().map(|v| v.into()).collect()))
117 }
118 }
119 v => Ok(v.into()),
120 }
121 }
122}
123
124impl Processor for JsonPathProcessor {
125 fn kind(&self) -> &str {
126 PROCESSOR_JSON_PATH
127 }
128
129 fn ignore_missing(&self) -> bool {
130 self.ignore_missing
131 }
132
133 fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
134 for field in self.fields.iter() {
135 let index = field.input_field();
136 let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
137 match val.get(index) {
138 Some(v) => {
139 let processed = self.process_field(v)?;
140 let output_index = field.target_or_input_field();
141 val.insert(KeyString::from(output_index), processed);
142 }
143 None => {
144 if !self.ignore_missing {
145 return ProcessorMissingFieldSnafu {
146 processor: self.kind(),
147 field: field.input_field(),
148 }
149 .fail();
150 }
151 }
152 }
153 }
154 Ok(val)
155 }
156}
157
158#[cfg(test)]
159mod test {
160 use std::collections::BTreeMap;
161
162 use vrl::prelude::Bytes;
163
164 #[test]
165 fn test_json_path() {
166 use super::*;
167
168 let json_path = JsonPath::try_from("$.hello").unwrap();
169 let processor = JsonPathProcessor {
170 json_path,
171 result_index: Some(0),
172 ..Default::default()
173 };
174
175 let result = processor
176 .process_field(&VrlValue::Object(BTreeMap::from([(
177 KeyString::from("hello"),
178 VrlValue::Bytes(Bytes::from("world")),
179 )])))
180 .unwrap();
181 assert_eq!(result, VrlValue::Bytes(Bytes::from("world")));
182 }
183}