common_function/scalars/json/
json_path_exists.rs1use std::fmt::{self, Display};
16use std::sync::Arc;
17
18use arrow::compute;
19use datafusion_common::DataFusionError;
20use datafusion_common::arrow::array::{Array, AsArray, BooleanBuilder};
21use datafusion_common::arrow::datatypes::DataType;
22use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature};
23
24use crate::function::{Function, extract_args};
25use crate::helper;
26
27#[derive(Clone, Debug)]
29pub(crate) struct JsonPathExistsFunction {
30 signature: Signature,
31}
32
33impl Default for JsonPathExistsFunction {
34 fn default() -> Self {
35 Self {
36 signature: helper::one_of_sigs2(
38 vec![DataType::Binary, DataType::BinaryView, DataType::Null],
39 vec![DataType::Utf8, DataType::Utf8View, DataType::Null],
40 ),
41 }
42 }
43}
44
45const NAME: &str = "json_path_exists";
46
47impl Function for JsonPathExistsFunction {
48 fn name(&self) -> &str {
49 NAME
50 }
51
52 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
53 Ok(DataType::Boolean)
54 }
55
56 fn signature(&self) -> &Signature {
57 &self.signature
58 }
59
60 fn invoke_with_args(
61 &self,
62 args: ScalarFunctionArgs,
63 ) -> datafusion_common::Result<ColumnarValue> {
64 let [jsons, paths] = extract_args(self.name(), &args)?;
65
66 let size = jsons.len();
67 let mut builder = BooleanBuilder::with_capacity(size);
68
69 match (jsons.data_type(), paths.data_type()) {
70 (DataType::Null, _) | (_, DataType::Null) => builder.append_nulls(size),
71 _ => {
72 let jsons = compute::cast(&jsons, &DataType::BinaryView)?;
73 let jsons = jsons.as_binary_view();
74 let paths = compute::cast(&paths, &DataType::Utf8View)?;
75 let paths = paths.as_string_view();
76 for i in 0..size {
77 let json = jsons.is_valid(i).then(|| jsons.value(i));
78 let path = paths.is_valid(i).then(|| paths.value(i));
79 let result = match (json, path) {
80 (Some(json), Some(path)) => {
81 let json_path = match jsonb::jsonpath::parse_json_path(path.as_bytes())
83 {
84 Ok(json_path) => json_path,
85 Err(e) => {
86 return Err(DataFusionError::Execution(format!(
87 "invalid json path '{path}': {e}"
88 )));
89 }
90 };
91 jsonb::path_exists(json, json_path).ok()
92 }
93 _ => None,
94 };
95
96 builder.append_option(result);
97 }
98 }
99 }
100
101 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
102 }
103}
104
105impl Display for JsonPathExistsFunction {
106 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
107 write!(f, "JSON_PATH_EXISTS")
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use std::sync::Arc;
114
115 use arrow_schema::Field;
116 use datafusion_common::arrow::array::{BinaryArray, NullArray, StringArray};
117
118 use super::*;
119
120 #[test]
121 fn test_json_path_exists_function() {
122 let json_path_exists = JsonPathExistsFunction::default();
123
124 assert_eq!("json_path_exists", json_path_exists.name());
125 assert_eq!(
126 DataType::Boolean,
127 json_path_exists.return_type(&[DataType::Binary]).unwrap()
128 );
129
130 let json_strings = [
131 r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
132 r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
133 r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
134 r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
135 r#"[1, 2, 3]"#,
136 r#"null"#,
137 r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
138 r#"null"#,
139 ];
140 let paths = vec![
141 "$.a.b.c", "$.b", "$.c.a", ".d", "$[0]", "$.a", "null", "null",
142 ];
143 let expected = [false, true, true, false, true, false, false, false];
144
145 let jsonbs = json_strings
146 .iter()
147 .map(|s| {
148 let value = jsonb::parse_value(s.as_bytes()).unwrap();
149 value.to_vec()
150 })
151 .collect::<Vec<_>>();
152
153 let args = ScalarFunctionArgs {
154 args: vec![
155 ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
156 ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
157 ],
158 arg_fields: vec![],
159 number_rows: 8,
160 return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
161 config_options: Arc::new(Default::default()),
162 };
163 let result = json_path_exists
164 .invoke_with_args(args)
165 .and_then(|x| x.to_array(8))
166 .unwrap();
167 let vector = result.as_boolean();
168
169 assert_eq!(8, vector.len());
171 for (i, real) in expected.iter().enumerate() {
172 let val = vector.value(i);
173 assert_eq!(val, *real);
174 }
175
176 let json_bytes = jsonb::parse_value("{}".as_bytes()).unwrap().to_vec();
178 let illegal_path = "$..a";
179
180 let args = ScalarFunctionArgs {
181 args: vec![
182 ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(vec![json_bytes]))),
183 ColumnarValue::Array(Arc::new(StringArray::from_iter_values(vec![illegal_path]))),
184 ],
185 arg_fields: vec![],
186 number_rows: 1,
187 return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
188 config_options: Arc::new(Default::default()),
189 };
190 let err = json_path_exists.invoke_with_args(args);
191 assert!(err.is_err());
192
193 let json_bytes = jsonb::parse_value("{}".as_bytes()).unwrap().to_vec();
195 let json = Arc::new(BinaryArray::from_iter_values(vec![json_bytes]));
196 let null_json = Arc::new(NullArray::new(1));
197
198 let path = Arc::new(StringArray::from_iter_values(vec!["$.a"]));
199 let null_path = Arc::new(NullArray::new(1));
200
201 let args = ScalarFunctionArgs {
202 args: vec![ColumnarValue::Array(null_json), ColumnarValue::Array(path)],
203 arg_fields: vec![],
204 number_rows: 1,
205 return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
206 config_options: Arc::new(Default::default()),
207 };
208 let result = json_path_exists
209 .invoke_with_args(args)
210 .and_then(|x| x.to_array(1))
211 .unwrap();
212 let result1 = result.as_boolean();
213
214 let args = ScalarFunctionArgs {
215 args: vec![ColumnarValue::Array(json), ColumnarValue::Array(null_path)],
216 arg_fields: vec![],
217 number_rows: 1,
218 return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
219 config_options: Arc::new(Default::default()),
220 };
221 let result = json_path_exists
222 .invoke_with_args(args)
223 .and_then(|x| x.to_array(1))
224 .unwrap();
225 let result2 = result.as_boolean();
226
227 assert_eq!(result1.len(), 1);
228 assert!(result1.is_null(0));
229 assert_eq!(result2.len(), 1);
230 assert!(result2.is_null(0));
231 }
232}