1use std::fmt::{self, Display};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use arrow::array::{ArrayRef, BinaryViewArray, StringViewArray, StructArray};
20use arrow::compute;
21use arrow::datatypes::{Float64Type, Int64Type, UInt64Type};
22use datafusion_common::arrow::array::{
23 Array, AsArray, BinaryViewBuilder, BooleanBuilder, Float64Builder, Int64Builder,
24 StringViewBuilder,
25};
26use datafusion_common::arrow::datatypes::DataType;
27use datafusion_common::{DataFusionError, Result};
28use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
29use datatypes::arrow_array::{int_array_value_at_index, string_array_value_at_index};
30use datatypes::json::JsonStructureSettings;
31use jsonpath_rust::JsonPath;
32use serde_json::Value;
33
34use crate::function::{Function, extract_args};
35use crate::helper;
36
37fn get_json_by_path(json: &[u8], path: &str) -> Option<Vec<u8>> {
38 let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes());
39 match json_path {
40 Ok(json_path) => {
41 let mut sub_jsonb = Vec::new();
42 let mut sub_offsets = Vec::new();
43 match jsonb::get_by_path(json, json_path, &mut sub_jsonb, &mut sub_offsets) {
44 Ok(_) => Some(sub_jsonb),
45 Err(_) => None,
46 }
47 }
48 _ => None,
49 }
50}
51
52macro_rules! json_get {
55 ($name:ident, $type:ident, $rust_type:ident, $doc:expr) => {
57 paste::paste! {
58 #[doc = $doc]
59 #[derive(Clone, Debug)]
60 pub struct $name {
61 signature: Signature,
62 }
63
64 impl $name {
65 pub const NAME: &'static str = stringify!([<$name:snake>]);
66 }
67
68 impl Default for $name {
69 fn default() -> Self {
70 Self {
71 signature: helper::one_of_sigs2(
73 vec![DataType::Binary, DataType::BinaryView],
74 vec![DataType::Utf8, DataType::Utf8View],
75 ),
76 }
77 }
78 }
79
80 impl Function for $name {
81 fn name(&self) -> &str {
82 Self::NAME
83 }
84
85 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
86 Ok(DataType::[<$type>])
87 }
88
89 fn signature(&self) -> &Signature {
90 &self.signature
91 }
92
93 fn invoke_with_args(
94 &self,
95 args: ScalarFunctionArgs,
96 ) -> datafusion_common::Result<ColumnarValue> {
97 let [arg0, arg1] = extract_args(self.name(), &args)?;
98 let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
99 let jsons = arg0.as_binary_view();
100 let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
101 let paths = arg1.as_string_view();
102
103 let size = jsons.len();
104 let mut builder = [<$type Builder>]::with_capacity(size);
105
106 for i in 0..size {
107 let json = jsons.is_valid(i).then(|| jsons.value(i));
108 let path = paths.is_valid(i).then(|| paths.value(i));
109 let result = match (json, path) {
110 (Some(json), Some(path)) => {
111 get_json_by_path(json, path)
112 .and_then(|json| { jsonb::[<to_ $rust_type>](&json).ok() })
113 }
114 _ => None,
115 };
116
117 builder.append_option(result);
118 }
119
120 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
121 }
122 }
123
124 impl Display for $name {
125 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
126 write!(f, "{}", Self::NAME.to_ascii_uppercase())
127 }
128 }
129 }
130 };
131}
132
133json_get!(
134 JsonGetFloat,
135 Float64,
136 f64,
137 "Get the value from the JSONB by the given path and return it as a float."
138);
139
140json_get!(
141 JsonGetBool,
142 Boolean,
143 bool,
144 "Get the value from the JSONB by the given path and return it as a boolean."
145);
146
147enum JsonResultValue<'a> {
148 Jsonb(Vec<u8>),
149 JsonStructByColumn(&'a ArrayRef, usize),
150 JsonStructByValue(&'a Value),
151}
152
153trait JsonGetResultBuilder {
154 fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()>;
155
156 fn append_null(&mut self);
157
158 fn build(&mut self) -> ArrayRef;
159}
160
161struct JsonGet {
168 signature: Signature,
169}
170
171impl JsonGet {
172 fn invoke<F, B>(&self, args: ScalarFunctionArgs, builder_factory: F) -> Result<ColumnarValue>
173 where
174 F: Fn(usize) -> B,
175 B: JsonGetResultBuilder,
176 {
177 let [arg0, arg1] = extract_args("JSON_GET", &args)?;
178
179 let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
180 let paths = arg1.as_string_view();
181
182 let mut builder = (builder_factory)(arg0.len());
183 match arg0.data_type() {
184 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
185 let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
186 let jsons = arg0.as_binary_view();
187 jsonb_get(jsons, paths, &mut builder)?;
188 }
189 DataType::Struct(_) => {
190 let jsons = arg0.as_struct();
191 json_struct_get(jsons, paths, &mut builder)?
192 }
193 _ => {
194 return Err(DataFusionError::Execution(format!(
195 "JSON_GET not supported argument type {}",
196 arg0.data_type(),
197 )));
198 }
199 };
200
201 Ok(ColumnarValue::Array(builder.build()))
202 }
203}
204
205impl Default for JsonGet {
206 fn default() -> Self {
207 Self {
208 signature: Signature::any(2, Volatility::Immutable),
209 }
210 }
211}
212
213#[derive(Default)]
214pub struct JsonGetString(JsonGet);
215
216impl JsonGetString {
217 pub const NAME: &'static str = "json_get_string";
218}
219
220impl Function for JsonGetString {
221 fn name(&self) -> &str {
222 Self::NAME
223 }
224
225 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
226 Ok(DataType::Utf8View)
227 }
228
229 fn signature(&self) -> &Signature {
230 &self.0.signature
231 }
232
233 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
234 struct StringResultBuilder(StringViewBuilder);
235
236 impl JsonGetResultBuilder for StringResultBuilder {
237 fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
238 match value {
239 JsonResultValue::Jsonb(value) => {
240 self.0.append_option(jsonb::to_str(&value).ok())
241 }
242 JsonResultValue::JsonStructByColumn(column, i) => {
243 if let Some(v) = string_array_value_at_index(column, i) {
244 self.0.append_value(v);
245 } else {
246 self.0
247 .append_value(arrow_cast::display::array_value_to_string(
248 column, i,
249 )?);
250 }
251 }
252 JsonResultValue::JsonStructByValue(value) => {
253 if let Some(s) = value.as_str() {
254 self.0.append_value(s)
255 } else {
256 self.0.append_value(value.to_string())
257 }
258 }
259 }
260 Ok(())
261 }
262
263 fn append_null(&mut self) {
264 self.0.append_null();
265 }
266
267 fn build(&mut self) -> ArrayRef {
268 Arc::new(self.0.finish())
269 }
270 }
271
272 self.0.invoke(args, |len: usize| {
273 StringResultBuilder(StringViewBuilder::with_capacity(len))
274 })
275 }
276}
277
278#[derive(Default)]
279pub struct JsonGetInt(JsonGet);
280
281impl JsonGetInt {
282 pub const NAME: &'static str = "json_get_int";
283}
284
285impl Function for JsonGetInt {
286 fn name(&self) -> &str {
287 Self::NAME
288 }
289
290 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
291 Ok(DataType::Int64)
292 }
293
294 fn signature(&self) -> &Signature {
295 &self.0.signature
296 }
297
298 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
299 struct IntResultBuilder(Int64Builder);
300
301 impl JsonGetResultBuilder for IntResultBuilder {
302 fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
303 match value {
304 JsonResultValue::Jsonb(value) => {
305 self.0.append_option(jsonb::to_i64(&value).ok())
306 }
307 JsonResultValue::JsonStructByColumn(column, i) => {
308 self.0.append_option(int_array_value_at_index(column, i))
309 }
310 JsonResultValue::JsonStructByValue(value) => {
311 self.0.append_option(value.as_i64())
312 }
313 }
314 Ok(())
315 }
316
317 fn append_null(&mut self) {
318 self.0.append_null();
319 }
320
321 fn build(&mut self) -> ArrayRef {
322 Arc::new(self.0.finish())
323 }
324 }
325
326 self.0.invoke(args, |len: usize| {
327 IntResultBuilder(Int64Builder::with_capacity(len))
328 })
329 }
330}
331
332impl Display for JsonGetInt {
333 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
334 write!(f, "{}", Self::NAME.to_ascii_uppercase())
335 }
336}
337
338fn jsonb_get(
339 jsons: &BinaryViewArray,
340 paths: &StringViewArray,
341 builder: &mut impl JsonGetResultBuilder,
342) -> Result<()> {
343 let size = jsons.len();
344 for i in 0..size {
345 let json = jsons.is_valid(i).then(|| jsons.value(i));
346 let path = paths.is_valid(i).then(|| paths.value(i));
347 let result = match (json, path) {
348 (Some(json), Some(path)) => get_json_by_path(json, path),
349 _ => None,
350 };
351 if let Some(v) = result {
352 builder.append_value(JsonResultValue::Jsonb(v))?;
353 } else {
354 builder.append_null();
355 }
356 }
357 Ok(())
358}
359
360fn json_struct_get(
361 jsons: &StructArray,
362 paths: &StringViewArray,
363 builder: &mut impl JsonGetResultBuilder,
364) -> Result<()> {
365 let size = jsons.len();
366 for i in 0..size {
367 if jsons.is_null(i) || paths.is_null(i) {
368 builder.append_null();
369 continue;
370 }
371 let path = paths.value(i);
372
373 let field_path = path.trim().replace("$.", "");
375 let column = jsons.column_by_name(&field_path);
376
377 if let Some(column) = column {
378 builder.append_value(JsonResultValue::JsonStructByColumn(column, i))?;
379 } else {
380 let Some(raw) = jsons
381 .column_by_name(JsonStructureSettings::RAW_FIELD)
382 .and_then(|x| string_array_value_at_index(x, i))
383 else {
384 builder.append_null();
385 continue;
386 };
387
388 let path: JsonPath<Value> = JsonPath::try_from(path).map_err(|e| {
389 DataFusionError::Execution(format!("{path} is not a valid JSON path: {e}"))
390 })?;
391 let value = json_struct_to_value(raw, jsons, i)?;
394
395 match path.find(&value) {
396 Value::Null => builder.append_null(),
397 Value::Array(values) => match values.as_slice() {
398 [] => builder.append_null(),
399 [x] => builder.append_value(JsonResultValue::JsonStructByValue(x))?,
400 _ => builder.append_value(JsonResultValue::JsonStructByValue(&value))?,
401 },
402 value => builder.append_value(JsonResultValue::JsonStructByValue(&value))?,
403 }
404 }
405 }
406
407 Ok(())
408}
409
410fn json_struct_to_value(raw: &str, jsons: &StructArray, i: usize) -> Result<Value> {
411 let Ok(mut json) = Value::from_str(raw) else {
412 return Err(DataFusionError::Internal(format!(
413 "inner field '{}' is not a valid JSON string",
414 JsonStructureSettings::RAW_FIELD
415 )));
416 };
417
418 for (column_name, column) in jsons.column_names().into_iter().zip(jsons.columns()) {
419 if column_name == JsonStructureSettings::RAW_FIELD {
420 continue;
421 }
422
423 let (json_pointer, field) = if let Some((json_object, field)) = column_name.rsplit_once(".")
424 {
425 let json_pointer = format!("/{}", json_object.replace(".", "/"));
426 (json_pointer, field)
427 } else {
428 ("".to_string(), column_name)
429 };
430 let Some(json_object) = json
431 .pointer_mut(&json_pointer)
432 .and_then(|x| x.as_object_mut())
433 else {
434 return Err(DataFusionError::Internal(format!(
435 "value at JSON pointer '{}' is not an object",
436 json_pointer
437 )));
438 };
439
440 macro_rules! insert {
441 ($column: ident, $i: ident, $json_object: ident, $field: ident) => {{
442 if let Some(value) = $column
443 .is_valid($i)
444 .then(|| serde_json::Value::from($column.value($i)))
445 {
446 $json_object.insert($field.to_string(), value);
447 }
448 }};
449 }
450
451 match column.data_type() {
452 DataType::Boolean => {
454 let column = column.as_boolean();
455 insert!(column, i, json_object, field);
456 }
457 DataType::Int64 => {
459 let column = column.as_primitive::<Int64Type>();
460 insert!(column, i, json_object, field);
461 }
462 DataType::UInt64 => {
463 let column = column.as_primitive::<UInt64Type>();
464 insert!(column, i, json_object, field);
465 }
466 DataType::Float64 => {
467 let column = column.as_primitive::<Float64Type>();
468 insert!(column, i, json_object, field);
469 }
470 DataType::Utf8 => {
472 let column = column.as_string::<i32>();
473 insert!(column, i, json_object, field);
474 }
475 DataType::LargeUtf8 => {
476 let column = column.as_string::<i64>();
477 insert!(column, i, json_object, field);
478 }
479 DataType::Utf8View => {
480 let column = column.as_string_view();
481 insert!(column, i, json_object, field);
482 }
483 _ => {
485 return Err(DataFusionError::NotImplemented(format!(
486 "{} is not yet supported to be executed with field {} of datatype {}",
487 JsonGetString::NAME,
488 column_name,
489 column.data_type()
490 )));
491 }
492 }
493 }
494 Ok(json)
495}
496
497impl Display for JsonGetString {
498 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
499 write!(f, "{}", Self::NAME.to_ascii_uppercase())
500 }
501}
502
503pub(super) struct JsonGetObject {
505 signature: Signature,
506}
507
508impl JsonGetObject {
509 const NAME: &'static str = "json_get_object";
510}
511
512impl Default for JsonGetObject {
513 fn default() -> Self {
514 Self {
515 signature: helper::one_of_sigs2(
516 vec![
517 DataType::Binary,
518 DataType::LargeBinary,
519 DataType::BinaryView,
520 ],
521 vec![DataType::UInt8, DataType::LargeUtf8, DataType::Utf8View],
522 ),
523 }
524 }
525}
526
527impl Function for JsonGetObject {
528 fn name(&self) -> &str {
529 Self::NAME
530 }
531
532 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
533 Ok(DataType::BinaryView)
534 }
535
536 fn signature(&self) -> &Signature {
537 &self.signature
538 }
539
540 fn invoke_with_args(
541 &self,
542 args: ScalarFunctionArgs,
543 ) -> datafusion_common::Result<ColumnarValue> {
544 let [arg0, arg1] = extract_args(self.name(), &args)?;
545 let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
546 let jsons = arg0.as_binary_view();
547 let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
548 let paths = arg1.as_string_view();
549
550 let len = jsons.len();
551 let mut builder = BinaryViewBuilder::with_capacity(len);
552
553 for i in 0..len {
554 let json = jsons.is_valid(i).then(|| jsons.value(i));
555 let path = paths.is_valid(i).then(|| paths.value(i));
556 let result = if let (Some(json), Some(path)) = (json, path) {
557 let result = jsonb::jsonpath::parse_json_path(path.as_bytes()).and_then(|path| {
558 let mut data = Vec::new();
559 let mut offset = Vec::new();
560 jsonb::get_by_path(json, path, &mut data, &mut offset)
561 .map(|()| jsonb::is_object(&data).then_some(data))
562 });
563 result.map_err(|e| DataFusionError::Execution(e.to_string()))?
564 } else {
565 None
566 };
567 builder.append_option(result);
568 }
569
570 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
571 }
572}
573
574impl Display for JsonGetObject {
575 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
576 write!(f, "{}", Self::NAME.to_ascii_uppercase())
577 }
578}
579
580#[cfg(test)]
581mod tests {
582 use std::sync::Arc;
583
584 use arrow::array::{Float64Array, Int64Array, StructArray};
585 use arrow_schema::Field;
586 use datafusion_common::ScalarValue;
587 use datafusion_common::arrow::array::{BinaryArray, BinaryViewArray, StringArray};
588 use datafusion_common::arrow::datatypes::{Float64Type, Int64Type};
589 use datatypes::types::parse_string_to_jsonb;
590 use serde_json::json;
591
592 use super::*;
593
594 fn test_json_struct() -> ArrayRef {
610 Arc::new(StructArray::new(
611 vec![
612 Field::new("kind", DataType::Utf8, true),
613 Field::new("payload.code", DataType::Int64, true),
614 Field::new("payload.result.time_cost", DataType::Float64, true),
615 Field::new(JsonStructureSettings::RAW_FIELD, DataType::Utf8View, true),
616 ]
617 .into(),
618 vec![
619 Arc::new(StringArray::from_iter([Some("foo")])) as ArrayRef,
620 Arc::new(Int64Array::from_iter([Some(404)])),
621 Arc::new(Float64Array::from_iter([Some(1.234)])),
622 Arc::new(StringViewArray::from_iter([Some(
623 json! ({
624 "payload": {
625 "success": false,
626 "result": {
627 "error": "not found"
628 }
629 }
630 })
631 .to_string(),
632 )])),
633 ],
634 None,
635 ))
636 }
637
638 #[test]
639 fn test_json_get_int() {
640 let json_get_int = JsonGetInt::default();
641
642 assert_eq!("json_get_int", json_get_int.name());
643 assert_eq!(
644 DataType::Int64,
645 json_get_int
646 .return_type(&[DataType::Binary, DataType::Utf8])
647 .unwrap()
648 );
649
650 let json_strings = [
651 r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
652 r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
653 r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
654 ];
655 let json_struct = test_json_struct();
656
657 let path_expects = vec![
658 ("$.a.b", Some(2)),
659 ("$.a", Some(4)),
660 ("$.c", None),
661 ("$.kind", None),
662 ("$.payload.code", Some(404)),
663 ("$.payload.success", None),
664 ("$.payload.result.time_cost", None),
665 ("$.payload.not-exists", None),
666 ("$.not-exists", None),
667 ("$", None),
668 ];
669
670 let mut jsons = json_strings
671 .iter()
672 .map(|s| {
673 let value = jsonb::parse_value(s.as_bytes()).unwrap();
674 Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
675 })
676 .collect::<Vec<_>>();
677 let json_struct_arrays =
678 std::iter::repeat_n(json_struct, path_expects.len() - jsons.len()).collect::<Vec<_>>();
679 jsons.extend(json_struct_arrays);
680
681 for i in 0..jsons.len() {
682 let json = &jsons[i];
683 let (path, expect) = path_expects[i];
684
685 let args = ScalarFunctionArgs {
686 args: vec![
687 ColumnarValue::Array(json.clone()),
688 ColumnarValue::Scalar(path.into()),
689 ],
690 arg_fields: vec![],
691 number_rows: 1,
692 return_field: Arc::new(Field::new("x", DataType::Int64, false)),
693 config_options: Arc::new(Default::default()),
694 };
695 let result = json_get_int
696 .invoke_with_args(args)
697 .and_then(|x| x.to_array(1))
698 .unwrap();
699
700 let result = result.as_primitive::<Int64Type>();
701 assert_eq!(1, result.len());
702 let actual = result.is_valid(0).then(|| result.value(0));
703 assert_eq!(actual, expect);
704 }
705 }
706
707 #[test]
708 fn test_json_get_float() {
709 let json_get_float = JsonGetFloat::default();
710
711 assert_eq!("json_get_float", json_get_float.name());
712 assert_eq!(
713 DataType::Float64,
714 json_get_float
715 .return_type(&[DataType::Binary, DataType::Utf8])
716 .unwrap()
717 );
718
719 let json_strings = [
720 r#"{"a": {"b": 2.1}, "b": 2.2, "c": 3.3}"#,
721 r#"{"a": 4.4, "b": {"c": 6.6}, "c": 6.6}"#,
722 r#"{"a": 7.7, "b": 8.8, "c": {"a": 7.7}}"#,
723 ];
724 let paths = vec!["$.a.b", "$.a", "$.c"];
725 let results = [Some(2.1), Some(4.4), None];
726
727 let jsonbs = json_strings
728 .iter()
729 .map(|s| {
730 let value = jsonb::parse_value(s.as_bytes()).unwrap();
731 value.to_vec()
732 })
733 .collect::<Vec<_>>();
734
735 let args = ScalarFunctionArgs {
736 args: vec![
737 ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
738 ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
739 ],
740 arg_fields: vec![],
741 number_rows: 3,
742 return_field: Arc::new(Field::new("x", DataType::Float64, false)),
743 config_options: Arc::new(Default::default()),
744 };
745 let result = json_get_float
746 .invoke_with_args(args)
747 .and_then(|x| x.to_array(3))
748 .unwrap();
749 let vector = result.as_primitive::<Float64Type>();
750
751 assert_eq!(3, vector.len());
752 for (i, gt) in results.iter().enumerate() {
753 let result = vector.is_valid(i).then(|| vector.value(i));
754 assert_eq!(*gt, result);
755 }
756 }
757
758 #[test]
759 fn test_json_get_bool() {
760 let json_get_bool = JsonGetBool::default();
761
762 assert_eq!("json_get_bool", json_get_bool.name());
763 assert_eq!(
764 DataType::Boolean,
765 json_get_bool
766 .return_type(&[DataType::Binary, DataType::Utf8])
767 .unwrap()
768 );
769
770 let json_strings = [
771 r#"{"a": {"b": true}, "b": false, "c": true}"#,
772 r#"{"a": false, "b": {"c": true}, "c": false}"#,
773 r#"{"a": true, "b": false, "c": {"a": true}}"#,
774 ];
775 let paths = vec!["$.a.b", "$.a", "$.c"];
776 let results = [Some(true), Some(false), None];
777
778 let jsonbs = json_strings
779 .iter()
780 .map(|s| {
781 let value = jsonb::parse_value(s.as_bytes()).unwrap();
782 value.to_vec()
783 })
784 .collect::<Vec<_>>();
785
786 let args = ScalarFunctionArgs {
787 args: vec![
788 ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
789 ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
790 ],
791 arg_fields: vec![],
792 number_rows: 3,
793 return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
794 config_options: Arc::new(Default::default()),
795 };
796 let result = json_get_bool
797 .invoke_with_args(args)
798 .and_then(|x| x.to_array(3))
799 .unwrap();
800 let vector = result.as_boolean();
801
802 assert_eq!(3, vector.len());
803 for (i, gt) in results.iter().enumerate() {
804 let result = vector.is_valid(i).then(|| vector.value(i));
805 assert_eq!(*gt, result);
806 }
807 }
808
809 #[test]
810 fn test_json_get_string() {
811 let json_get_string = JsonGetString::default();
812
813 assert_eq!("json_get_string", json_get_string.name());
814 assert_eq!(
815 DataType::Utf8View,
816 json_get_string
817 .return_type(&[DataType::Binary, DataType::Utf8])
818 .unwrap()
819 );
820
821 let json_strings = [
822 r#"{"a": {"b": "a"}, "b": "b", "c": "c"}"#,
823 r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
824 r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#,
825 ];
826 let json_struct = test_json_struct();
827
828 let paths = vec![
829 "$.a.b",
830 "$.a",
831 "",
832 "$.kind",
833 "$.payload.code",
834 "$.payload.result.time_cost",
835 "$.payload",
836 "$.payload.success",
837 "$.payload.result",
838 "$.payload.result.error",
839 "$.payload.result.not-exists",
840 "$.payload.not-exists",
841 "$.not-exists",
842 "$",
843 ];
844 let expects = [
845 Some("a"),
846 Some("d"),
847 None,
848 Some("foo"),
849 Some("404"),
850 Some("1.234"),
851 Some(
852 r#"{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}"#,
853 ),
854 Some("false"),
855 Some(r#"{"error":"not found","time_cost":1.234}"#),
856 Some("not found"),
857 None,
858 None,
859 None,
860 Some(
861 r#"{"kind":"foo","payload":{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}}"#,
862 ),
863 ];
864
865 let mut jsons = json_strings
866 .iter()
867 .map(|s| {
868 let value = jsonb::parse_value(s.as_bytes()).unwrap();
869 Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
870 })
871 .collect::<Vec<_>>();
872 let json_struct_arrays =
873 std::iter::repeat_n(json_struct, expects.len() - jsons.len()).collect::<Vec<_>>();
874 jsons.extend(json_struct_arrays);
875
876 for i in 0..jsons.len() {
877 let json = &jsons[i];
878 let path = paths[i];
879 let expect = expects[i];
880
881 let args = ScalarFunctionArgs {
882 args: vec![
883 ColumnarValue::Array(json.clone()),
884 ColumnarValue::Scalar(path.into()),
885 ],
886 arg_fields: vec![],
887 number_rows: 1,
888 return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
889 config_options: Arc::new(Default::default()),
890 };
891 let result = json_get_string
892 .invoke_with_args(args)
893 .and_then(|x| x.to_array(1))
894 .unwrap();
895
896 let result = result.as_string_view();
897 assert_eq!(1, result.len());
898 let actual = result.is_valid(0).then(|| result.value(0));
899 assert_eq!(actual, expect);
900 }
901 }
902
903 #[test]
904 fn test_json_get_object() -> Result<()> {
905 let udf = JsonGetObject::default();
906 assert_eq!("json_get_object", udf.name());
907 assert_eq!(
908 DataType::BinaryView,
909 udf.return_type(&[DataType::BinaryView, DataType::Utf8View])?
910 );
911
912 let json_value = parse_string_to_jsonb(r#"{"a": {"b": {"c": {"d": 1}}}}"#).unwrap();
913 let paths = vec!["$", "$.a", "$.a.b", "$.a.b.c", "$.a.b.c.d", "$.e", "$.a.e"];
914 let number_rows = paths.len();
915
916 let args = ScalarFunctionArgs {
917 args: vec![
918 ColumnarValue::Scalar(ScalarValue::Binary(Some(json_value))),
919 ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
920 ],
921 arg_fields: vec![],
922 number_rows,
923 return_field: Arc::new(Field::new("x", DataType::Binary, false)),
924 config_options: Arc::new(Default::default()),
925 };
926 let result = udf
927 .invoke_with_args(args)
928 .and_then(|x| x.to_array(number_rows))?;
929 let result = result.as_binary_view();
930
931 let expected = &BinaryViewArray::from_iter(
932 vec![
933 Some(r#"{"a": {"b": {"c": {"d": 1}}}}"#),
934 Some(r#"{"b": {"c": {"d": 1}}}"#),
935 Some(r#"{"c": {"d": 1}}"#),
936 Some(r#"{"d": 1}"#),
937 None,
938 None,
939 None,
940 ]
941 .into_iter()
942 .map(|x| x.and_then(|s| parse_string_to_jsonb(s).ok())),
943 );
944 assert_eq!(result, expected);
945 Ok(())
946 }
947}