common_function/scalars/json/
json_path_exists.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{self, Display};
16use std::sync::Arc;
17
18use arrow::compute;
19use datafusion_common::DataFusionError;
20use datafusion_common::arrow::array::{Array, AsArray, BooleanBuilder};
21use datafusion_common::arrow::datatypes::DataType;
22use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature};
23
24use crate::function::{Function, extract_args};
25use crate::helper;
26
27/// Check if the given JSON data contains the given JSON path.
28#[derive(Clone, Debug)]
29pub(crate) struct JsonPathExistsFunction {
30    signature: Signature,
31}
32
33impl Default for JsonPathExistsFunction {
34    fn default() -> Self {
35        Self {
36            // TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type.
37            signature: helper::one_of_sigs2(
38                vec![DataType::Binary, DataType::BinaryView, DataType::Null],
39                vec![DataType::Utf8, DataType::Utf8View, DataType::Null],
40            ),
41        }
42    }
43}
44
45const NAME: &str = "json_path_exists";
46
47impl Function for JsonPathExistsFunction {
48    fn name(&self) -> &str {
49        NAME
50    }
51
52    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
53        Ok(DataType::Boolean)
54    }
55
56    fn signature(&self) -> &Signature {
57        &self.signature
58    }
59
60    fn invoke_with_args(
61        &self,
62        args: ScalarFunctionArgs,
63    ) -> datafusion_common::Result<ColumnarValue> {
64        let [jsons, paths] = extract_args(self.name(), &args)?;
65
66        let size = jsons.len();
67        let mut builder = BooleanBuilder::with_capacity(size);
68
69        match (jsons.data_type(), paths.data_type()) {
70            (DataType::Null, _) | (_, DataType::Null) => builder.append_nulls(size),
71            _ => {
72                let jsons = compute::cast(&jsons, &DataType::BinaryView)?;
73                let jsons = jsons.as_binary_view();
74                let paths = compute::cast(&paths, &DataType::Utf8View)?;
75                let paths = paths.as_string_view();
76                for i in 0..size {
77                    let json = jsons.is_valid(i).then(|| jsons.value(i));
78                    let path = paths.is_valid(i).then(|| paths.value(i));
79                    let result = match (json, path) {
80                        (Some(json), Some(path)) => {
81                            // Get `JsonPath`.
82                            let json_path = match jsonb::jsonpath::parse_json_path(path.as_bytes())
83                            {
84                                Ok(json_path) => json_path,
85                                Err(e) => {
86                                    return Err(DataFusionError::Execution(format!(
87                                        "invalid json path '{path}': {e}"
88                                    )));
89                                }
90                            };
91                            jsonb::path_exists(json, json_path).ok()
92                        }
93                        _ => None,
94                    };
95
96                    builder.append_option(result);
97                }
98            }
99        }
100
101        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
102    }
103}
104
105impl Display for JsonPathExistsFunction {
106    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
107        write!(f, "JSON_PATH_EXISTS")
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use std::sync::Arc;
114
115    use arrow_schema::Field;
116    use datafusion_common::arrow::array::{BinaryArray, NullArray, StringArray};
117
118    use super::*;
119
120    #[test]
121    fn test_json_path_exists_function() {
122        let json_path_exists = JsonPathExistsFunction::default();
123
124        assert_eq!("json_path_exists", json_path_exists.name());
125        assert_eq!(
126            DataType::Boolean,
127            json_path_exists.return_type(&[DataType::Binary]).unwrap()
128        );
129
130        let json_strings = [
131            r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
132            r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
133            r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
134            r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
135            r#"[1, 2, 3]"#,
136            r#"null"#,
137            r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
138            r#"null"#,
139        ];
140        let paths = vec![
141            "$.a.b.c", "$.b", "$.c.a", ".d", "$[0]", "$.a", "null", "null",
142        ];
143        let expected = [false, true, true, false, true, false, false, false];
144
145        let jsonbs = json_strings
146            .iter()
147            .map(|s| {
148                let value = jsonb::parse_value(s.as_bytes()).unwrap();
149                value.to_vec()
150            })
151            .collect::<Vec<_>>();
152
153        let args = ScalarFunctionArgs {
154            args: vec![
155                ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
156                ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
157            ],
158            arg_fields: vec![],
159            number_rows: 8,
160            return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
161            config_options: Arc::new(Default::default()),
162        };
163        let result = json_path_exists
164            .invoke_with_args(args)
165            .and_then(|x| x.to_array(8))
166            .unwrap();
167        let vector = result.as_boolean();
168
169        // Test for non-nulls.
170        assert_eq!(8, vector.len());
171        for (i, real) in expected.iter().enumerate() {
172            let val = vector.value(i);
173            assert_eq!(val, *real);
174        }
175
176        // Test for path error.
177        let json_bytes = jsonb::parse_value("{}".as_bytes()).unwrap().to_vec();
178        let illegal_path = "$..a";
179
180        let args = ScalarFunctionArgs {
181            args: vec![
182                ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(vec![json_bytes]))),
183                ColumnarValue::Array(Arc::new(StringArray::from_iter_values(vec![illegal_path]))),
184            ],
185            arg_fields: vec![],
186            number_rows: 1,
187            return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
188            config_options: Arc::new(Default::default()),
189        };
190        let err = json_path_exists.invoke_with_args(args);
191        assert!(err.is_err());
192
193        // Test for nulls.
194        let json_bytes = jsonb::parse_value("{}".as_bytes()).unwrap().to_vec();
195        let json = Arc::new(BinaryArray::from_iter_values(vec![json_bytes]));
196        let null_json = Arc::new(NullArray::new(1));
197
198        let path = Arc::new(StringArray::from_iter_values(vec!["$.a"]));
199        let null_path = Arc::new(NullArray::new(1));
200
201        let args = ScalarFunctionArgs {
202            args: vec![ColumnarValue::Array(null_json), ColumnarValue::Array(path)],
203            arg_fields: vec![],
204            number_rows: 1,
205            return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
206            config_options: Arc::new(Default::default()),
207        };
208        let result = json_path_exists
209            .invoke_with_args(args)
210            .and_then(|x| x.to_array(1))
211            .unwrap();
212        let result1 = result.as_boolean();
213
214        let args = ScalarFunctionArgs {
215            args: vec![ColumnarValue::Array(json), ColumnarValue::Array(null_path)],
216            arg_fields: vec![],
217            number_rows: 1,
218            return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
219            config_options: Arc::new(Default::default()),
220        };
221        let result = json_path_exists
222            .invoke_with_args(args)
223            .and_then(|x| x.to_array(1))
224            .unwrap();
225        let result2 = result.as_boolean();
226
227        assert_eq!(result1.len(), 1);
228        assert!(result1.is_null(0));
229        assert_eq!(result2.len(), 1);
230        assert!(result2.is_null(0));
231    }
232}