1use std::fmt::{self, Display};
16use std::sync::Arc;
17
18use arrow::compute;
19use datafusion_common::DataFusionError;
20use datafusion_common::arrow::array::{
21 Array, AsArray, BinaryViewBuilder, BooleanBuilder, Float64Builder, Int64Builder,
22 StringViewBuilder,
23};
24use datafusion_common::arrow::datatypes::DataType;
25use datafusion_expr::type_coercion::aggregates::STRINGS;
26use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature};
27
28use crate::function::{Function, extract_args};
29use crate::helper;
30
31fn get_json_by_path(json: &[u8], path: &str) -> Option<Vec<u8>> {
32 let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes());
33 match json_path {
34 Ok(json_path) => {
35 let mut sub_jsonb = Vec::new();
36 let mut sub_offsets = Vec::new();
37 match jsonb::get_by_path(json, json_path, &mut sub_jsonb, &mut sub_offsets) {
38 Ok(_) => Some(sub_jsonb),
39 Err(_) => None,
40 }
41 }
42 _ => None,
43 }
44}
45
46macro_rules! json_get {
49 ($name:ident, $type:ident, $rust_type:ident, $doc:expr) => {
51 paste::paste! {
52 #[doc = $doc]
53 #[derive(Clone, Debug)]
54 pub struct $name {
55 signature: Signature,
56 }
57
58 impl $name {
59 pub const NAME: &'static str = stringify!([<$name:snake>]);
60 }
61
62 impl Default for $name {
63 fn default() -> Self {
64 Self {
65 signature: helper::one_of_sigs2(
67 vec![DataType::Binary, DataType::BinaryView],
68 vec![DataType::Utf8, DataType::Utf8View],
69 ),
70 }
71 }
72 }
73
74 impl Function for $name {
75 fn name(&self) -> &str {
76 Self::NAME
77 }
78
79 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
80 Ok(DataType::[<$type>])
81 }
82
83 fn signature(&self) -> &Signature {
84 &self.signature
85 }
86
87 fn invoke_with_args(
88 &self,
89 args: ScalarFunctionArgs,
90 ) -> datafusion_common::Result<ColumnarValue> {
91 let [arg0, arg1] = extract_args(self.name(), &args)?;
92 let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
93 let jsons = arg0.as_binary_view();
94 let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
95 let paths = arg1.as_string_view();
96
97 let size = jsons.len();
98 let mut builder = [<$type Builder>]::with_capacity(size);
99
100 for i in 0..size {
101 let json = jsons.is_valid(i).then(|| jsons.value(i));
102 let path = paths.is_valid(i).then(|| paths.value(i));
103 let result = match (json, path) {
104 (Some(json), Some(path)) => {
105 get_json_by_path(json, path)
106 .and_then(|json| { jsonb::[<to_ $rust_type>](&json).ok() })
107 }
108 _ => None,
109 };
110
111 builder.append_option(result);
112 }
113
114 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
115 }
116 }
117
118 impl Display for $name {
119 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
120 write!(f, "{}", Self::NAME.to_ascii_uppercase())
121 }
122 }
123 }
124 };
125}
126
127json_get!(
128 JsonGetInt,
129 Int64,
130 i64,
131 "Get the value from the JSONB by the given path and return it as an integer."
132);
133
134json_get!(
135 JsonGetFloat,
136 Float64,
137 f64,
138 "Get the value from the JSONB by the given path and return it as a float."
139);
140
141json_get!(
142 JsonGetBool,
143 Boolean,
144 bool,
145 "Get the value from the JSONB by the given path and return it as a boolean."
146);
147
148#[derive(Clone, Debug)]
150pub struct JsonGetString {
151 signature: Signature,
152}
153
154impl JsonGetString {
155 pub const NAME: &'static str = "json_get_string";
156}
157
158impl Default for JsonGetString {
159 fn default() -> Self {
160 Self {
161 signature: helper::one_of_sigs2(
163 vec![DataType::Binary, DataType::BinaryView],
164 vec![DataType::Utf8, DataType::Utf8View],
165 ),
166 }
167 }
168}
169
170impl Function for JsonGetString {
171 fn name(&self) -> &str {
172 Self::NAME
173 }
174
175 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
176 Ok(DataType::Utf8View)
177 }
178
179 fn signature(&self) -> &Signature {
180 &self.signature
181 }
182
183 fn invoke_with_args(
184 &self,
185 args: ScalarFunctionArgs,
186 ) -> datafusion_common::Result<ColumnarValue> {
187 let [arg0, arg1] = extract_args(self.name(), &args)?;
188 let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
189 let jsons = arg0.as_binary_view();
190 let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
191 let paths = arg1.as_string_view();
192
193 let size = jsons.len();
194 let mut builder = StringViewBuilder::with_capacity(size);
195
196 for i in 0..size {
197 let json = jsons.is_valid(i).then(|| jsons.value(i));
198 let path = paths.is_valid(i).then(|| paths.value(i));
199 let result = match (json, path) {
200 (Some(json), Some(path)) => {
201 get_json_by_path(json, path).and_then(|json| jsonb::to_str(&json).ok())
202 }
203 _ => None,
204 };
205 builder.append_option(result);
206 }
207
208 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
209 }
210}
211
212impl Display for JsonGetString {
213 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
214 write!(f, "{}", Self::NAME.to_ascii_uppercase())
215 }
216}
217
218pub(super) struct JsonGetObject {
220 signature: Signature,
221}
222
223impl JsonGetObject {
224 const NAME: &'static str = "json_get_object";
225}
226
227impl Default for JsonGetObject {
228 fn default() -> Self {
229 Self {
230 signature: helper::one_of_sigs2(
231 vec![
232 DataType::Binary,
233 DataType::LargeBinary,
234 DataType::BinaryView,
235 ],
236 STRINGS.to_vec(),
237 ),
238 }
239 }
240}
241
242impl Function for JsonGetObject {
243 fn name(&self) -> &str {
244 Self::NAME
245 }
246
247 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
248 Ok(DataType::BinaryView)
249 }
250
251 fn signature(&self) -> &Signature {
252 &self.signature
253 }
254
255 fn invoke_with_args(
256 &self,
257 args: ScalarFunctionArgs,
258 ) -> datafusion_common::Result<ColumnarValue> {
259 let [arg0, arg1] = extract_args(self.name(), &args)?;
260 let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
261 let jsons = arg0.as_binary_view();
262 let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
263 let paths = arg1.as_string_view();
264
265 let len = jsons.len();
266 let mut builder = BinaryViewBuilder::with_capacity(len);
267
268 for i in 0..len {
269 let json = jsons.is_valid(i).then(|| jsons.value(i));
270 let path = paths.is_valid(i).then(|| paths.value(i));
271 let result = if let (Some(json), Some(path)) = (json, path) {
272 let result = jsonb::jsonpath::parse_json_path(path.as_bytes()).and_then(|path| {
273 let mut data = Vec::new();
274 let mut offset = Vec::new();
275 jsonb::get_by_path(json, path, &mut data, &mut offset)
276 .map(|()| jsonb::is_object(&data).then_some(data))
277 });
278 result.map_err(|e| DataFusionError::Execution(e.to_string()))?
279 } else {
280 None
281 };
282 builder.append_option(result);
283 }
284
285 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
286 }
287}
288
289impl Display for JsonGetObject {
290 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
291 write!(f, "{}", Self::NAME.to_ascii_uppercase())
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use std::sync::Arc;
298
299 use arrow_schema::Field;
300 use datafusion_common::ScalarValue;
301 use datafusion_common::arrow::array::{BinaryArray, BinaryViewArray, StringArray};
302 use datafusion_common::arrow::datatypes::{Float64Type, Int64Type};
303 use datatypes::types::parse_string_to_jsonb;
304
305 use super::*;
306
307 #[test]
308 fn test_json_get_int() {
309 let json_get_int = JsonGetInt::default();
310
311 assert_eq!("json_get_int", json_get_int.name());
312 assert_eq!(
313 DataType::Int64,
314 json_get_int
315 .return_type(&[DataType::Binary, DataType::Utf8])
316 .unwrap()
317 );
318
319 let json_strings = [
320 r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
321 r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
322 r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
323 ];
324 let paths = vec!["$.a.b", "$.a", "$.c"];
325 let results = [Some(2), Some(4), None];
326
327 let jsonbs = json_strings
328 .iter()
329 .map(|s| {
330 let value = jsonb::parse_value(s.as_bytes()).unwrap();
331 value.to_vec()
332 })
333 .collect::<Vec<_>>();
334
335 let args = ScalarFunctionArgs {
336 args: vec![
337 ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
338 ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
339 ],
340 arg_fields: vec![],
341 number_rows: 3,
342 return_field: Arc::new(Field::new("x", DataType::Int64, false)),
343 config_options: Arc::new(Default::default()),
344 };
345 let result = json_get_int
346 .invoke_with_args(args)
347 .and_then(|x| x.to_array(3))
348 .unwrap();
349 let vector = result.as_primitive::<Int64Type>();
350
351 assert_eq!(3, vector.len());
352 for (i, gt) in results.iter().enumerate() {
353 let result = vector.is_valid(i).then(|| vector.value(i));
354 assert_eq!(*gt, result);
355 }
356 }
357
358 #[test]
359 fn test_json_get_float() {
360 let json_get_float = JsonGetFloat::default();
361
362 assert_eq!("json_get_float", json_get_float.name());
363 assert_eq!(
364 DataType::Float64,
365 json_get_float
366 .return_type(&[DataType::Binary, DataType::Utf8])
367 .unwrap()
368 );
369
370 let json_strings = [
371 r#"{"a": {"b": 2.1}, "b": 2.2, "c": 3.3}"#,
372 r#"{"a": 4.4, "b": {"c": 6.6}, "c": 6.6}"#,
373 r#"{"a": 7.7, "b": 8.8, "c": {"a": 7.7}}"#,
374 ];
375 let paths = vec!["$.a.b", "$.a", "$.c"];
376 let results = [Some(2.1), Some(4.4), None];
377
378 let jsonbs = json_strings
379 .iter()
380 .map(|s| {
381 let value = jsonb::parse_value(s.as_bytes()).unwrap();
382 value.to_vec()
383 })
384 .collect::<Vec<_>>();
385
386 let args = ScalarFunctionArgs {
387 args: vec![
388 ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
389 ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
390 ],
391 arg_fields: vec![],
392 number_rows: 3,
393 return_field: Arc::new(Field::new("x", DataType::Float64, false)),
394 config_options: Arc::new(Default::default()),
395 };
396 let result = json_get_float
397 .invoke_with_args(args)
398 .and_then(|x| x.to_array(3))
399 .unwrap();
400 let vector = result.as_primitive::<Float64Type>();
401
402 assert_eq!(3, vector.len());
403 for (i, gt) in results.iter().enumerate() {
404 let result = vector.is_valid(i).then(|| vector.value(i));
405 assert_eq!(*gt, result);
406 }
407 }
408
409 #[test]
410 fn test_json_get_bool() {
411 let json_get_bool = JsonGetBool::default();
412
413 assert_eq!("json_get_bool", json_get_bool.name());
414 assert_eq!(
415 DataType::Boolean,
416 json_get_bool
417 .return_type(&[DataType::Binary, DataType::Utf8])
418 .unwrap()
419 );
420
421 let json_strings = [
422 r#"{"a": {"b": true}, "b": false, "c": true}"#,
423 r#"{"a": false, "b": {"c": true}, "c": false}"#,
424 r#"{"a": true, "b": false, "c": {"a": true}}"#,
425 ];
426 let paths = vec!["$.a.b", "$.a", "$.c"];
427 let results = [Some(true), Some(false), None];
428
429 let jsonbs = json_strings
430 .iter()
431 .map(|s| {
432 let value = jsonb::parse_value(s.as_bytes()).unwrap();
433 value.to_vec()
434 })
435 .collect::<Vec<_>>();
436
437 let args = ScalarFunctionArgs {
438 args: vec![
439 ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
440 ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
441 ],
442 arg_fields: vec![],
443 number_rows: 3,
444 return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
445 config_options: Arc::new(Default::default()),
446 };
447 let result = json_get_bool
448 .invoke_with_args(args)
449 .and_then(|x| x.to_array(3))
450 .unwrap();
451 let vector = result.as_boolean();
452
453 assert_eq!(3, vector.len());
454 for (i, gt) in results.iter().enumerate() {
455 let result = vector.is_valid(i).then(|| vector.value(i));
456 assert_eq!(*gt, result);
457 }
458 }
459
460 #[test]
461 fn test_json_get_string() {
462 let json_get_string = JsonGetString::default();
463
464 assert_eq!("json_get_string", json_get_string.name());
465 assert_eq!(
466 DataType::Utf8View,
467 json_get_string
468 .return_type(&[DataType::Binary, DataType::Utf8])
469 .unwrap()
470 );
471
472 let json_strings = [
473 r#"{"a": {"b": "a"}, "b": "b", "c": "c"}"#,
474 r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
475 r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#,
476 ];
477 let paths = vec!["$.a.b", "$.a", ""];
478 let results = [Some("a"), Some("d"), None];
479
480 let jsonbs = json_strings
481 .iter()
482 .map(|s| {
483 let value = jsonb::parse_value(s.as_bytes()).unwrap();
484 value.to_vec()
485 })
486 .collect::<Vec<_>>();
487
488 let args = ScalarFunctionArgs {
489 args: vec![
490 ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
491 ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
492 ],
493 arg_fields: vec![],
494 number_rows: 3,
495 return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
496 config_options: Arc::new(Default::default()),
497 };
498 let result = json_get_string
499 .invoke_with_args(args)
500 .and_then(|x| x.to_array(3))
501 .unwrap();
502 let vector = result.as_string_view();
503
504 assert_eq!(3, vector.len());
505 for (i, gt) in results.iter().enumerate() {
506 let result = vector.is_valid(i).then(|| vector.value(i));
507 assert_eq!(*gt, result);
508 }
509 }
510
511 #[test]
512 fn test_json_get_object() -> datafusion_common::Result<()> {
513 let udf = JsonGetObject::default();
514 assert_eq!("json_get_object", udf.name());
515 assert_eq!(
516 DataType::BinaryView,
517 udf.return_type(&[DataType::BinaryView, DataType::Utf8View])?
518 );
519
520 let json_value = parse_string_to_jsonb(r#"{"a": {"b": {"c": {"d": 1}}}}"#).unwrap();
521 let paths = vec!["$", "$.a", "$.a.b", "$.a.b.c", "$.a.b.c.d", "$.e", "$.a.e"];
522 let number_rows = paths.len();
523
524 let args = ScalarFunctionArgs {
525 args: vec![
526 ColumnarValue::Scalar(ScalarValue::Binary(Some(json_value))),
527 ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
528 ],
529 arg_fields: vec![],
530 number_rows,
531 return_field: Arc::new(Field::new("x", DataType::Binary, false)),
532 config_options: Arc::new(Default::default()),
533 };
534 let result = udf
535 .invoke_with_args(args)
536 .and_then(|x| x.to_array(number_rows))?;
537 let result = result.as_binary_view();
538
539 let expected = &BinaryViewArray::from_iter(
540 vec![
541 Some(r#"{"a": {"b": {"c": {"d": 1}}}}"#),
542 Some(r#"{"b": {"c": {"d": 1}}}"#),
543 Some(r#"{"c": {"d": 1}}"#),
544 Some(r#"{"d": 1}"#),
545 None,
546 None,
547 None,
548 ]
549 .into_iter()
550 .map(|x| x.and_then(|s| parse_string_to_jsonb(s).ok())),
551 );
552 assert_eq!(result, expected);
553 Ok(())
554 }
555}