pipeline/etl/processor/
json_path.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use jsonpath_rust::JsonPath;
16use snafu::{OptionExt, ResultExt};
17
18use crate::error::{
19    Error, JsonPathParseResultIndexSnafu, JsonPathParseSnafu, KeyMustBeStringSnafu,
20    ProcessorMissingFieldSnafu, Result,
21};
22use crate::etl::field::Fields;
23use crate::etl::processor::{
24    yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, PipelineMap, Processor, FIELDS_NAME,
25    FIELD_NAME, IGNORE_MISSING_NAME, JSON_PATH_NAME, JSON_PATH_RESULT_INDEX_NAME,
26};
27use crate::Value;
28
29pub(crate) const PROCESSOR_JSON_PATH: &str = "json_path";
30
31impl TryFrom<&yaml_rust::yaml::Hash> for JsonPathProcessor {
32    type Error = Error;
33
34    fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
35        let mut fields = Fields::default();
36        let mut ignore_missing = false;
37        let mut json_path = None;
38        let mut result_idex = None;
39
40        for (k, v) in value.iter() {
41            let key = k
42                .as_str()
43                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
44            match key {
45                FIELD_NAME => {
46                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
47                }
48                FIELDS_NAME => {
49                    fields = yaml_new_fields(v, FIELDS_NAME)?;
50                }
51
52                IGNORE_MISSING_NAME => {
53                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
54                }
55                JSON_PATH_RESULT_INDEX_NAME => {
56                    result_idex = Some(v.as_i64().context(JsonPathParseResultIndexSnafu)? as usize);
57                }
58
59                JSON_PATH_NAME => {
60                    let json_path_str = yaml_string(v, JSON_PATH_NAME)?;
61                    json_path = Some(
62                        JsonPath::try_from(json_path_str.as_str()).context(JsonPathParseSnafu)?,
63                    );
64                }
65
66                _ => {}
67            }
68        }
69
70        let processor = JsonPathProcessor {
71            fields,
72            json_path: json_path.context(ProcessorMissingFieldSnafu {
73                processor: PROCESSOR_JSON_PATH,
74                field: JSON_PATH_NAME,
75            })?,
76            ignore_missing,
77            result_index: result_idex,
78        };
79
80        Ok(processor)
81    }
82}
83
84#[derive(Debug)]
85pub struct JsonPathProcessor {
86    fields: Fields,
87    json_path: JsonPath<Value>,
88    ignore_missing: bool,
89    result_index: Option<usize>,
90}
91
92impl Default for JsonPathProcessor {
93    fn default() -> Self {
94        JsonPathProcessor {
95            fields: Fields::default(),
96            json_path: JsonPath::try_from("$").unwrap(),
97            ignore_missing: false,
98            result_index: None,
99        }
100    }
101}
102
103impl JsonPathProcessor {
104    fn process_field(&self, val: &Value) -> Result<Value> {
105        let processed = self.json_path.find(val);
106        match processed {
107            Value::Array(arr) => {
108                if let Some(index) = self.result_index {
109                    Ok(arr.get(index).cloned().unwrap_or(Value::Null))
110                } else {
111                    Ok(Value::Array(arr))
112                }
113            }
114            v => Ok(v),
115        }
116    }
117}
118
119impl Processor for JsonPathProcessor {
120    fn kind(&self) -> &str {
121        PROCESSOR_JSON_PATH
122    }
123
124    fn ignore_missing(&self) -> bool {
125        self.ignore_missing
126    }
127
128    fn exec_mut(&self, val: &mut PipelineMap) -> Result<()> {
129        for field in self.fields.iter() {
130            let index = field.input_field();
131            match val.get(index) {
132                Some(v) => {
133                    let processed = self.process_field(v)?;
134                    let output_index = field.target_or_input_field();
135                    val.insert(output_index.to_string(), processed);
136                }
137                None => {
138                    if !self.ignore_missing {
139                        return ProcessorMissingFieldSnafu {
140                            processor: self.kind(),
141                            field: field.input_field(),
142                        }
143                        .fail();
144                    }
145                }
146            }
147        }
148        Ok(())
149    }
150}
151
152#[cfg(test)]
153mod test {
154    use crate::Map;
155
156    #[test]
157    fn test_json_path() {
158        use super::*;
159        use crate::Value;
160
161        let json_path = JsonPath::try_from("$.hello").unwrap();
162        let processor = JsonPathProcessor {
163            json_path,
164            result_index: Some(0),
165            ..Default::default()
166        };
167
168        let result = processor
169            .process_field(&Value::Map(Map::one(
170                "hello",
171                Value::String("world".to_string()),
172            )))
173            .unwrap();
174        assert_eq!(result, Value::String("world".to_string()));
175    }
176}