pipeline/etl/processor/
json_path.rs1use jsonpath_rust::JsonPath;
16use snafu::{OptionExt, ResultExt};
17
18use crate::error::{
19 Error, JsonPathParseResultIndexSnafu, JsonPathParseSnafu, KeyMustBeStringSnafu,
20 ProcessorMissingFieldSnafu, Result,
21};
22use crate::etl::field::Fields;
23use crate::etl::processor::{
24 yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, PipelineMap, Processor, FIELDS_NAME,
25 FIELD_NAME, IGNORE_MISSING_NAME, JSON_PATH_NAME, JSON_PATH_RESULT_INDEX_NAME,
26};
27use crate::Value;
28
29pub(crate) const PROCESSOR_JSON_PATH: &str = "json_path";
30
31impl TryFrom<&yaml_rust::yaml::Hash> for JsonPathProcessor {
32 type Error = Error;
33
34 fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
35 let mut fields = Fields::default();
36 let mut ignore_missing = false;
37 let mut json_path = None;
38 let mut result_idex = None;
39
40 for (k, v) in value.iter() {
41 let key = k
42 .as_str()
43 .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
44 match key {
45 FIELD_NAME => {
46 fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
47 }
48 FIELDS_NAME => {
49 fields = yaml_new_fields(v, FIELDS_NAME)?;
50 }
51
52 IGNORE_MISSING_NAME => {
53 ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
54 }
55 JSON_PATH_RESULT_INDEX_NAME => {
56 result_idex = Some(v.as_i64().context(JsonPathParseResultIndexSnafu)? as usize);
57 }
58
59 JSON_PATH_NAME => {
60 let json_path_str = yaml_string(v, JSON_PATH_NAME)?;
61 json_path = Some(
62 JsonPath::try_from(json_path_str.as_str()).context(JsonPathParseSnafu)?,
63 );
64 }
65
66 _ => {}
67 }
68 }
69
70 let processor = JsonPathProcessor {
71 fields,
72 json_path: json_path.context(ProcessorMissingFieldSnafu {
73 processor: PROCESSOR_JSON_PATH,
74 field: JSON_PATH_NAME,
75 })?,
76 ignore_missing,
77 result_index: result_idex,
78 };
79
80 Ok(processor)
81 }
82}
83
84#[derive(Debug)]
85pub struct JsonPathProcessor {
86 fields: Fields,
87 json_path: JsonPath<Value>,
88 ignore_missing: bool,
89 result_index: Option<usize>,
90}
91
92impl Default for JsonPathProcessor {
93 fn default() -> Self {
94 JsonPathProcessor {
95 fields: Fields::default(),
96 json_path: JsonPath::try_from("$").unwrap(),
97 ignore_missing: false,
98 result_index: None,
99 }
100 }
101}
102
103impl JsonPathProcessor {
104 fn process_field(&self, val: &Value) -> Result<Value> {
105 let processed = self.json_path.find(val);
106 match processed {
107 Value::Array(arr) => {
108 if let Some(index) = self.result_index {
109 Ok(arr.get(index).cloned().unwrap_or(Value::Null))
110 } else {
111 Ok(Value::Array(arr))
112 }
113 }
114 v => Ok(v),
115 }
116 }
117}
118
119impl Processor for JsonPathProcessor {
120 fn kind(&self) -> &str {
121 PROCESSOR_JSON_PATH
122 }
123
124 fn ignore_missing(&self) -> bool {
125 self.ignore_missing
126 }
127
128 fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
129 for field in self.fields.iter() {
130 let index = field.input_field();
131 match val.get(index) {
132 Some(v) => {
133 let processed = self.process_field(v)?;
134 let output_index = field.target_or_input_field();
135 val.insert(output_index.to_string(), processed);
136 }
137 None => {
138 if !self.ignore_missing {
139 return ProcessorMissingFieldSnafu {
140 processor: self.kind(),
141 field: field.input_field(),
142 }
143 .fail();
144 }
145 }
146 }
147 }
148 Ok(())
149 }
150}
151
152#[cfg(test)]
153mod test {
154 use crate::Map;
155
156 #[test]
157 fn test_json_path() {
158 use super::*;
159 use crate::Value;
160
161 let json_path = JsonPath::try_from("$.hello").unwrap();
162 let processor = JsonPathProcessor {
163 json_path,
164 result_index: Some(0),
165 ..Default::default()
166 };
167
168 let result = processor
169 .process_field(&Value::Map(Map::one(
170 "hello",
171 Value::String("world".to_string()),
172 )))
173 .unwrap();
174 assert_eq!(result, Value::String("world".to_string()));
175 }
176}