pipeline/etl/processor/
json_path.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use jsonpath_rust::JsonPath;
16use snafu::{OptionExt, ResultExt};
17use vrl::value::{KeyString, Value as VrlValue};
18
19use crate::error::{
20    Error, JsonParseSnafu, JsonPathParseResultIndexSnafu, JsonPathParseSnafu, KeyMustBeStringSnafu,
21    ProcessorMissingFieldSnafu, Result, ValueMustBeMapSnafu,
22};
23use crate::etl::field::Fields;
24use crate::etl::processor::{
25    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, Processor, FIELDS_NAME, FIELD_NAME,
26    IGNORE_MISSING_NAME, JSON_PATH_NAME, JSON_PATH_RESULT_INDEX_NAME,
27};
28
29pub(crate) const PROCESSOR_JSON_PATH: &str = "json_path";
30
31impl TryFrom<&yaml_rust::yaml::Hash> for JsonPathProcessor {
32    type Error = Error;
33
34    fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
35        let mut fields = Fields::default();
36        let mut ignore_missing = false;
37        let mut json_path = None;
38        let mut result_idex = None;
39
40        for (k, v) in value.iter() {
41            let key = k
42                .as_str()
43                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
44            match key {
45                FIELD_NAME => {
46                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
47                }
48                FIELDS_NAME => {
49                    fields = yaml_new_fields(v, FIELDS_NAME)?;
50                }
51
52                IGNORE_MISSING_NAME => {
53                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
54                }
55                JSON_PATH_RESULT_INDEX_NAME => {
56                    result_idex = Some(v.as_i64().context(JsonPathParseResultIndexSnafu)? as usize);
57                }
58
59                JSON_PATH_NAME => {
60                    let json_path_str = yaml_string(v, JSON_PATH_NAME)?;
61                    json_path = Some(
62                        JsonPath::try_from(json_path_str.as_str()).context(JsonPathParseSnafu)?,
63                    );
64                }
65
66                _ => {}
67            }
68        }
69
70        let processor = JsonPathProcessor {
71            fields,
72            json_path: json_path.context(ProcessorMissingFieldSnafu {
73                processor: PROCESSOR_JSON_PATH,
74                field: JSON_PATH_NAME,
75            })?,
76            ignore_missing,
77            result_index: result_idex,
78        };
79
80        Ok(processor)
81    }
82}
83
84#[derive(Debug)]
85pub struct JsonPathProcessor {
86    fields: Fields,
87    json_path: JsonPath<serde_json::Value>,
88    ignore_missing: bool,
89    result_index: Option<usize>,
90}
91
92impl Default for JsonPathProcessor {
93    fn default() -> Self {
94        JsonPathProcessor {
95            fields: Fields::default(),
96            json_path: JsonPath::try_from("$").unwrap(),
97            ignore_missing: false,
98            result_index: None,
99        }
100    }
101}
102
103impl JsonPathProcessor {
104    fn process_field(&self, val: &VrlValue) -> Result<VrlValue> {
105        let v = serde_json::to_value(val).context(JsonParseSnafu)?;
106        let p = self.json_path.find(&v);
107        match p {
108            serde_json::Value::Array(arr) => {
109                if let Some(index) = self.result_index {
110                    Ok(arr
111                        .get(index)
112                        .cloned()
113                        .map(|v| v.into())
114                        .unwrap_or(VrlValue::Null))
115                } else {
116                    Ok(VrlValue::Array(arr.into_iter().map(|v| v.into()).collect()))
117                }
118            }
119            v => Ok(v.into()),
120        }
121    }
122}
123
124impl Processor for JsonPathProcessor {
125    fn kind(&self) -> &str {
126        PROCESSOR_JSON_PATH
127    }
128
129    fn ignore_missing(&self) -> bool {
130        self.ignore_missing
131    }
132
133    fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
134        for field in self.fields.iter() {
135            let index = field.input_field();
136            let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
137            match val.get(index) {
138                Some(v) => {
139                    let processed = self.process_field(v)?;
140                    let output_index = field.target_or_input_field();
141                    val.insert(KeyString::from(output_index), processed);
142                }
143                None => {
144                    if !self.ignore_missing {
145                        return ProcessorMissingFieldSnafu {
146                            processor: self.kind(),
147                            field: field.input_field(),
148                        }
149                        .fail();
150                    }
151                }
152            }
153        }
154        Ok(val)
155    }
156}
157
158#[cfg(test)]
159mod test {
160    use std::collections::BTreeMap;
161
162    use vrl::prelude::Bytes;
163
164    #[test]
165    fn test_json_path() {
166        use super::*;
167
168        let json_path = JsonPath::try_from("$.hello").unwrap();
169        let processor = JsonPathProcessor {
170            json_path,
171            result_index: Some(0),
172            ..Default::default()
173        };
174
175        let result = processor
176            .process_field(&VrlValue::Object(BTreeMap::from([(
177                KeyString::from("hello"),
178                VrlValue::Bytes(Bytes::from("world")),
179            )])))
180            .unwrap();
181        assert_eq!(result, VrlValue::Bytes(Bytes::from("world")));
182    }
183}