pipeline/etl/processor/
json_parse.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use snafu::{OptionExt as _, ResultExt};
16
17use crate::error::{
18    Error, FieldMustBeTypeSnafu, JsonParseSnafu, KeyMustBeStringSnafu, ProcessorMissingFieldSnafu,
19    ProcessorUnsupportedValueSnafu, Result,
20};
21use crate::etl::field::Fields;
22use crate::etl::processor::{
23    yaml_bool, yaml_new_field, yaml_new_fields, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME,
24};
25use crate::{json_to_map, PipelineMap, Processor, Value};
26
27pub(crate) const PROCESSOR_JSON_PARSE: &str = "json_parse";
28
29#[derive(Debug, Default)]
30pub struct JsonParseProcessor {
31    fields: Fields,
32    ignore_missing: bool,
33}
34
35impl TryFrom<&yaml_rust::yaml::Hash> for JsonParseProcessor {
36    type Error = Error;
37
38    fn try_from(value: &yaml_rust::yaml::Hash) -> std::result::Result<Self, Self::Error> {
39        let mut fields = Fields::default();
40        let mut ignore_missing = false;
41
42        for (k, v) in value.iter() {
43            let key = k
44                .as_str()
45                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
46            match key {
47                FIELD_NAME => {
48                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
49                }
50                FIELDS_NAME => {
51                    fields = yaml_new_fields(v, FIELDS_NAME)?;
52                }
53                IGNORE_MISSING_NAME => {
54                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
55                }
56                _ => {}
57            }
58        }
59
60        let processor = JsonParseProcessor {
61            fields,
62            ignore_missing,
63        };
64
65        Ok(processor)
66    }
67}
68
69impl JsonParseProcessor {
70    fn process_field(&self, val: &Value) -> Result<Value> {
71        let Some(json_str) = val.as_str() else {
72            return FieldMustBeTypeSnafu {
73                field: val.to_str_type(),
74                ty: "string",
75            }
76            .fail();
77        };
78        let parsed: serde_json::Value = serde_json::from_str(json_str).context(JsonParseSnafu)?;
79        match parsed {
80            serde_json::Value::Object(_) => Ok(Value::Map(json_to_map(parsed)?.into())),
81            serde_json::Value::Array(arr) => Ok(Value::Array(arr.try_into()?)),
82            _ => ProcessorUnsupportedValueSnafu {
83                processor: self.kind(),
84                val: val.to_str_type(),
85            }
86            .fail(),
87        }
88    }
89}
90
91impl Processor for JsonParseProcessor {
92    fn kind(&self) -> &str {
93        PROCESSOR_JSON_PARSE
94    }
95
96    fn ignore_missing(&self) -> bool {
97        self.ignore_missing
98    }
99
100    fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
101        for field in self.fields.iter() {
102            let index = field.input_field();
103            match val.get(index) {
104                Some(v) => {
105                    let processed = self.process_field(v)?;
106                    let output_index = field.target_or_input_field();
107                    val.insert(output_index.to_string(), processed);
108                }
109                None => {
110                    if !self.ignore_missing {
111                        return ProcessorMissingFieldSnafu {
112                            processor: self.kind(),
113                            field: field.input_field(),
114                        }
115                        .fail();
116                    }
117                }
118            }
119        }
120        Ok(val)
121    }
122}
123
124#[cfg(test)]
125mod test {
126
127    #[test]
128    fn test_json_parse() {
129        use super::*;
130        use crate::Value;
131
132        let processor = JsonParseProcessor {
133            ..Default::default()
134        };
135
136        let result = processor
137            .process_field(&Value::String(r#"{"hello": "world"}"#.to_string()))
138            .unwrap();
139
140        let expected = Value::Map(crate::Map::one(
141            "hello".to_string(),
142            Value::String("world".to_string()),
143        ));
144
145        assert_eq!(result, expected);
146    }
147}