1use std::fmt::{self, Display};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use arrow::array::{ArrayRef, BinaryViewArray, StringViewArray, StructArray};
20use arrow::compute;
21use arrow::datatypes::{Float64Type, Int64Type, UInt64Type};
22use datafusion_common::arrow::array::{
23 Array, AsArray, BinaryViewBuilder, BooleanBuilder, Float64Builder, Int64Builder,
24 StringViewBuilder,
25};
26use datafusion_common::arrow::datatypes::DataType;
27use datafusion_common::{DataFusionError, Result};
28use datafusion_expr::type_coercion::aggregates::STRINGS;
29use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
30use datatypes::arrow_array::{int_array_value_at_index, string_array_value_at_index};
31use datatypes::json::JsonStructureSettings;
32use jsonpath_rust::JsonPath;
33use serde_json::Value;
34
35use crate::function::{Function, extract_args};
36use crate::helper;
37
38fn get_json_by_path(json: &[u8], path: &str) -> Option<Vec<u8>> {
39 let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes());
40 match json_path {
41 Ok(json_path) => {
42 let mut sub_jsonb = Vec::new();
43 let mut sub_offsets = Vec::new();
44 match jsonb::get_by_path(json, json_path, &mut sub_jsonb, &mut sub_offsets) {
45 Ok(_) => Some(sub_jsonb),
46 Err(_) => None,
47 }
48 }
49 _ => None,
50 }
51}
52
53macro_rules! json_get {
56 ($name:ident, $type:ident, $rust_type:ident, $doc:expr) => {
58 paste::paste! {
59 #[doc = $doc]
60 #[derive(Clone, Debug)]
61 pub struct $name {
62 signature: Signature,
63 }
64
65 impl $name {
66 pub const NAME: &'static str = stringify!([<$name:snake>]);
67 }
68
69 impl Default for $name {
70 fn default() -> Self {
71 Self {
72 signature: helper::one_of_sigs2(
74 vec![DataType::Binary, DataType::BinaryView],
75 vec![DataType::Utf8, DataType::Utf8View],
76 ),
77 }
78 }
79 }
80
81 impl Function for $name {
82 fn name(&self) -> &str {
83 Self::NAME
84 }
85
86 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
87 Ok(DataType::[<$type>])
88 }
89
90 fn signature(&self) -> &Signature {
91 &self.signature
92 }
93
94 fn invoke_with_args(
95 &self,
96 args: ScalarFunctionArgs,
97 ) -> datafusion_common::Result<ColumnarValue> {
98 let [arg0, arg1] = extract_args(self.name(), &args)?;
99 let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
100 let jsons = arg0.as_binary_view();
101 let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
102 let paths = arg1.as_string_view();
103
104 let size = jsons.len();
105 let mut builder = [<$type Builder>]::with_capacity(size);
106
107 for i in 0..size {
108 let json = jsons.is_valid(i).then(|| jsons.value(i));
109 let path = paths.is_valid(i).then(|| paths.value(i));
110 let result = match (json, path) {
111 (Some(json), Some(path)) => {
112 get_json_by_path(json, path)
113 .and_then(|json| { jsonb::[<to_ $rust_type>](&json).ok() })
114 }
115 _ => None,
116 };
117
118 builder.append_option(result);
119 }
120
121 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
122 }
123 }
124
125 impl Display for $name {
126 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
127 write!(f, "{}", Self::NAME.to_ascii_uppercase())
128 }
129 }
130 }
131 };
132}
133
134json_get!(
135 JsonGetFloat,
136 Float64,
137 f64,
138 "Get the value from the JSONB by the given path and return it as a float."
139);
140
141json_get!(
142 JsonGetBool,
143 Boolean,
144 bool,
145 "Get the value from the JSONB by the given path and return it as a boolean."
146);
147
148enum JsonResultValue<'a> {
149 Jsonb(Vec<u8>),
150 JsonStructByColumn(&'a ArrayRef, usize),
151 JsonStructByValue(&'a Value),
152}
153
154trait JsonGetResultBuilder {
155 fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()>;
156
157 fn append_null(&mut self);
158
159 fn build(&mut self) -> ArrayRef;
160}
161
162struct JsonGet {
169 signature: Signature,
170}
171
172impl JsonGet {
173 fn invoke<F, B>(&self, args: ScalarFunctionArgs, builder_factory: F) -> Result<ColumnarValue>
174 where
175 F: Fn(usize) -> B,
176 B: JsonGetResultBuilder,
177 {
178 let [arg0, arg1] = extract_args("JSON_GET", &args)?;
179
180 let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
181 let paths = arg1.as_string_view();
182
183 let mut builder = (builder_factory)(arg0.len());
184 match arg0.data_type() {
185 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
186 let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
187 let jsons = arg0.as_binary_view();
188 jsonb_get(jsons, paths, &mut builder)?;
189 }
190 DataType::Struct(_) => {
191 let jsons = arg0.as_struct();
192 json_struct_get(jsons, paths, &mut builder)?
193 }
194 _ => {
195 return Err(DataFusionError::Execution(format!(
196 "JSON_GET not supported argument type {}",
197 arg0.data_type(),
198 )));
199 }
200 };
201
202 Ok(ColumnarValue::Array(builder.build()))
203 }
204}
205
206impl Default for JsonGet {
207 fn default() -> Self {
208 Self {
209 signature: Signature::any(2, Volatility::Immutable),
210 }
211 }
212}
213
214#[derive(Default)]
215pub struct JsonGetString(JsonGet);
216
217impl JsonGetString {
218 pub const NAME: &'static str = "json_get_string";
219}
220
221impl Function for JsonGetString {
222 fn name(&self) -> &str {
223 Self::NAME
224 }
225
226 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
227 Ok(DataType::Utf8View)
228 }
229
230 fn signature(&self) -> &Signature {
231 &self.0.signature
232 }
233
234 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
235 struct StringResultBuilder(StringViewBuilder);
236
237 impl JsonGetResultBuilder for StringResultBuilder {
238 fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
239 match value {
240 JsonResultValue::Jsonb(value) => {
241 self.0.append_option(jsonb::to_str(&value).ok())
242 }
243 JsonResultValue::JsonStructByColumn(column, i) => {
244 if let Some(v) = string_array_value_at_index(column, i) {
245 self.0.append_value(v);
246 } else {
247 self.0
248 .append_value(arrow_cast::display::array_value_to_string(
249 column, i,
250 )?);
251 }
252 }
253 JsonResultValue::JsonStructByValue(value) => {
254 if let Some(s) = value.as_str() {
255 self.0.append_value(s)
256 } else {
257 self.0.append_value(value.to_string())
258 }
259 }
260 }
261 Ok(())
262 }
263
264 fn append_null(&mut self) {
265 self.0.append_null();
266 }
267
268 fn build(&mut self) -> ArrayRef {
269 Arc::new(self.0.finish())
270 }
271 }
272
273 self.0.invoke(args, |len: usize| {
274 StringResultBuilder(StringViewBuilder::with_capacity(len))
275 })
276 }
277}
278
279#[derive(Default)]
280pub struct JsonGetInt(JsonGet);
281
282impl JsonGetInt {
283 pub const NAME: &'static str = "json_get_int";
284}
285
286impl Function for JsonGetInt {
287 fn name(&self) -> &str {
288 Self::NAME
289 }
290
291 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
292 Ok(DataType::Int64)
293 }
294
295 fn signature(&self) -> &Signature {
296 &self.0.signature
297 }
298
299 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
300 struct IntResultBuilder(Int64Builder);
301
302 impl JsonGetResultBuilder for IntResultBuilder {
303 fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
304 match value {
305 JsonResultValue::Jsonb(value) => {
306 self.0.append_option(jsonb::to_i64(&value).ok())
307 }
308 JsonResultValue::JsonStructByColumn(column, i) => {
309 self.0.append_option(int_array_value_at_index(column, i))
310 }
311 JsonResultValue::JsonStructByValue(value) => {
312 self.0.append_option(value.as_i64())
313 }
314 }
315 Ok(())
316 }
317
318 fn append_null(&mut self) {
319 self.0.append_null();
320 }
321
322 fn build(&mut self) -> ArrayRef {
323 Arc::new(self.0.finish())
324 }
325 }
326
327 self.0.invoke(args, |len: usize| {
328 IntResultBuilder(Int64Builder::with_capacity(len))
329 })
330 }
331}
332
333impl Display for JsonGetInt {
334 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
335 write!(f, "{}", Self::NAME.to_ascii_uppercase())
336 }
337}
338
339fn jsonb_get(
340 jsons: &BinaryViewArray,
341 paths: &StringViewArray,
342 builder: &mut impl JsonGetResultBuilder,
343) -> Result<()> {
344 let size = jsons.len();
345 for i in 0..size {
346 let json = jsons.is_valid(i).then(|| jsons.value(i));
347 let path = paths.is_valid(i).then(|| paths.value(i));
348 let result = match (json, path) {
349 (Some(json), Some(path)) => get_json_by_path(json, path),
350 _ => None,
351 };
352 if let Some(v) = result {
353 builder.append_value(JsonResultValue::Jsonb(v))?;
354 } else {
355 builder.append_null();
356 }
357 }
358 Ok(())
359}
360
361fn json_struct_get(
362 jsons: &StructArray,
363 paths: &StringViewArray,
364 builder: &mut impl JsonGetResultBuilder,
365) -> Result<()> {
366 let size = jsons.len();
367 for i in 0..size {
368 if jsons.is_null(i) || paths.is_null(i) {
369 builder.append_null();
370 continue;
371 }
372 let path = paths.value(i);
373
374 let field_path = path.trim().replace("$.", "");
376 let column = jsons.column_by_name(&field_path);
377
378 if let Some(column) = column {
379 builder.append_value(JsonResultValue::JsonStructByColumn(column, i))?;
380 } else {
381 let Some(raw) = jsons
382 .column_by_name(JsonStructureSettings::RAW_FIELD)
383 .and_then(|x| string_array_value_at_index(x, i))
384 else {
385 builder.append_null();
386 continue;
387 };
388
389 let path: JsonPath<Value> = JsonPath::try_from(path).map_err(|e| {
390 DataFusionError::Execution(format!("{path} is not a valid JSON path: {e}"))
391 })?;
392 let value = json_struct_to_value(raw, jsons, i)?;
395
396 match path.find(&value) {
397 Value::Null => builder.append_null(),
398 Value::Array(values) => match values.as_slice() {
399 [] => builder.append_null(),
400 [x] => builder.append_value(JsonResultValue::JsonStructByValue(x))?,
401 _ => builder.append_value(JsonResultValue::JsonStructByValue(&value))?,
402 },
403 value => builder.append_value(JsonResultValue::JsonStructByValue(&value))?,
404 }
405 }
406 }
407
408 Ok(())
409}
410
411fn json_struct_to_value(raw: &str, jsons: &StructArray, i: usize) -> Result<Value> {
412 let Ok(mut json) = Value::from_str(raw) else {
413 return Err(DataFusionError::Internal(format!(
414 "inner field '{}' is not a valid JSON string",
415 JsonStructureSettings::RAW_FIELD
416 )));
417 };
418
419 for (column_name, column) in jsons.column_names().into_iter().zip(jsons.columns()) {
420 if column_name == JsonStructureSettings::RAW_FIELD {
421 continue;
422 }
423
424 let (json_pointer, field) = if let Some((json_object, field)) = column_name.rsplit_once(".")
425 {
426 let json_pointer = format!("/{}", json_object.replace(".", "/"));
427 (json_pointer, field)
428 } else {
429 ("".to_string(), column_name)
430 };
431 let Some(json_object) = json
432 .pointer_mut(&json_pointer)
433 .and_then(|x| x.as_object_mut())
434 else {
435 return Err(DataFusionError::Internal(format!(
436 "value at JSON pointer '{}' is not an object",
437 json_pointer
438 )));
439 };
440
441 macro_rules! insert {
442 ($column: ident, $i: ident, $json_object: ident, $field: ident) => {{
443 if let Some(value) = $column
444 .is_valid($i)
445 .then(|| serde_json::Value::from($column.value($i)))
446 {
447 $json_object.insert($field.to_string(), value);
448 }
449 }};
450 }
451
452 match column.data_type() {
453 DataType::Boolean => {
455 let column = column.as_boolean();
456 insert!(column, i, json_object, field);
457 }
458 DataType::Int64 => {
460 let column = column.as_primitive::<Int64Type>();
461 insert!(column, i, json_object, field);
462 }
463 DataType::UInt64 => {
464 let column = column.as_primitive::<UInt64Type>();
465 insert!(column, i, json_object, field);
466 }
467 DataType::Float64 => {
468 let column = column.as_primitive::<Float64Type>();
469 insert!(column, i, json_object, field);
470 }
471 DataType::Utf8 => {
473 let column = column.as_string::<i32>();
474 insert!(column, i, json_object, field);
475 }
476 DataType::LargeUtf8 => {
477 let column = column.as_string::<i64>();
478 insert!(column, i, json_object, field);
479 }
480 DataType::Utf8View => {
481 let column = column.as_string_view();
482 insert!(column, i, json_object, field);
483 }
484 _ => {
486 return Err(DataFusionError::NotImplemented(format!(
487 "{} is not yet supported to be executed with field {} of datatype {}",
488 JsonGetString::NAME,
489 column_name,
490 column.data_type()
491 )));
492 }
493 }
494 }
495 Ok(json)
496}
497
498impl Display for JsonGetString {
499 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
500 write!(f, "{}", Self::NAME.to_ascii_uppercase())
501 }
502}
503
504pub(super) struct JsonGetObject {
506 signature: Signature,
507}
508
509impl JsonGetObject {
510 const NAME: &'static str = "json_get_object";
511}
512
513impl Default for JsonGetObject {
514 fn default() -> Self {
515 Self {
516 signature: helper::one_of_sigs2(
517 vec![
518 DataType::Binary,
519 DataType::LargeBinary,
520 DataType::BinaryView,
521 ],
522 STRINGS.to_vec(),
523 ),
524 }
525 }
526}
527
528impl Function for JsonGetObject {
529 fn name(&self) -> &str {
530 Self::NAME
531 }
532
533 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
534 Ok(DataType::BinaryView)
535 }
536
537 fn signature(&self) -> &Signature {
538 &self.signature
539 }
540
541 fn invoke_with_args(
542 &self,
543 args: ScalarFunctionArgs,
544 ) -> datafusion_common::Result<ColumnarValue> {
545 let [arg0, arg1] = extract_args(self.name(), &args)?;
546 let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
547 let jsons = arg0.as_binary_view();
548 let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
549 let paths = arg1.as_string_view();
550
551 let len = jsons.len();
552 let mut builder = BinaryViewBuilder::with_capacity(len);
553
554 for i in 0..len {
555 let json = jsons.is_valid(i).then(|| jsons.value(i));
556 let path = paths.is_valid(i).then(|| paths.value(i));
557 let result = if let (Some(json), Some(path)) = (json, path) {
558 let result = jsonb::jsonpath::parse_json_path(path.as_bytes()).and_then(|path| {
559 let mut data = Vec::new();
560 let mut offset = Vec::new();
561 jsonb::get_by_path(json, path, &mut data, &mut offset)
562 .map(|()| jsonb::is_object(&data).then_some(data))
563 });
564 result.map_err(|e| DataFusionError::Execution(e.to_string()))?
565 } else {
566 None
567 };
568 builder.append_option(result);
569 }
570
571 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
572 }
573}
574
575impl Display for JsonGetObject {
576 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
577 write!(f, "{}", Self::NAME.to_ascii_uppercase())
578 }
579}
580
581#[cfg(test)]
582mod tests {
583 use std::sync::Arc;
584
585 use arrow::array::{Float64Array, Int64Array, StructArray};
586 use arrow_schema::Field;
587 use datafusion_common::ScalarValue;
588 use datafusion_common::arrow::array::{BinaryArray, BinaryViewArray, StringArray};
589 use datafusion_common::arrow::datatypes::{Float64Type, Int64Type};
590 use datatypes::types::parse_string_to_jsonb;
591 use serde_json::json;
592
593 use super::*;
594
595 fn test_json_struct() -> ArrayRef {
611 Arc::new(StructArray::new(
612 vec![
613 Field::new("kind", DataType::Utf8, true),
614 Field::new("payload.code", DataType::Int64, true),
615 Field::new("payload.result.time_cost", DataType::Float64, true),
616 Field::new(JsonStructureSettings::RAW_FIELD, DataType::Utf8View, true),
617 ]
618 .into(),
619 vec![
620 Arc::new(StringArray::from_iter([Some("foo")])) as ArrayRef,
621 Arc::new(Int64Array::from_iter([Some(404)])),
622 Arc::new(Float64Array::from_iter([Some(1.234)])),
623 Arc::new(StringViewArray::from_iter([Some(
624 json! ({
625 "payload": {
626 "success": false,
627 "result": {
628 "error": "not found"
629 }
630 }
631 })
632 .to_string(),
633 )])),
634 ],
635 None,
636 ))
637 }
638
639 #[test]
640 fn test_json_get_int() {
641 let json_get_int = JsonGetInt::default();
642
643 assert_eq!("json_get_int", json_get_int.name());
644 assert_eq!(
645 DataType::Int64,
646 json_get_int
647 .return_type(&[DataType::Binary, DataType::Utf8])
648 .unwrap()
649 );
650
651 let json_strings = [
652 r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
653 r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
654 r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
655 ];
656 let json_struct = test_json_struct();
657
658 let path_expects = vec![
659 ("$.a.b", Some(2)),
660 ("$.a", Some(4)),
661 ("$.c", None),
662 ("$.kind", None),
663 ("$.payload.code", Some(404)),
664 ("$.payload.success", None),
665 ("$.payload.result.time_cost", None),
666 ("$.payload.not-exists", None),
667 ("$.not-exists", None),
668 ("$", None),
669 ];
670
671 let mut jsons = json_strings
672 .iter()
673 .map(|s| {
674 let value = jsonb::parse_value(s.as_bytes()).unwrap();
675 Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
676 })
677 .collect::<Vec<_>>();
678 let json_struct_arrays =
679 std::iter::repeat_n(json_struct, path_expects.len() - jsons.len()).collect::<Vec<_>>();
680 jsons.extend(json_struct_arrays);
681
682 for i in 0..jsons.len() {
683 let json = &jsons[i];
684 let (path, expect) = path_expects[i];
685
686 let args = ScalarFunctionArgs {
687 args: vec![
688 ColumnarValue::Array(json.clone()),
689 ColumnarValue::Scalar(path.into()),
690 ],
691 arg_fields: vec![],
692 number_rows: 1,
693 return_field: Arc::new(Field::new("x", DataType::Int64, false)),
694 config_options: Arc::new(Default::default()),
695 };
696 let result = json_get_int
697 .invoke_with_args(args)
698 .and_then(|x| x.to_array(1))
699 .unwrap();
700
701 let result = result.as_primitive::<Int64Type>();
702 assert_eq!(1, result.len());
703 let actual = result.is_valid(0).then(|| result.value(0));
704 assert_eq!(actual, expect);
705 }
706 }
707
708 #[test]
709 fn test_json_get_float() {
710 let json_get_float = JsonGetFloat::default();
711
712 assert_eq!("json_get_float", json_get_float.name());
713 assert_eq!(
714 DataType::Float64,
715 json_get_float
716 .return_type(&[DataType::Binary, DataType::Utf8])
717 .unwrap()
718 );
719
720 let json_strings = [
721 r#"{"a": {"b": 2.1}, "b": 2.2, "c": 3.3}"#,
722 r#"{"a": 4.4, "b": {"c": 6.6}, "c": 6.6}"#,
723 r#"{"a": 7.7, "b": 8.8, "c": {"a": 7.7}}"#,
724 ];
725 let paths = vec!["$.a.b", "$.a", "$.c"];
726 let results = [Some(2.1), Some(4.4), None];
727
728 let jsonbs = json_strings
729 .iter()
730 .map(|s| {
731 let value = jsonb::parse_value(s.as_bytes()).unwrap();
732 value.to_vec()
733 })
734 .collect::<Vec<_>>();
735
736 let args = ScalarFunctionArgs {
737 args: vec![
738 ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
739 ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
740 ],
741 arg_fields: vec![],
742 number_rows: 3,
743 return_field: Arc::new(Field::new("x", DataType::Float64, false)),
744 config_options: Arc::new(Default::default()),
745 };
746 let result = json_get_float
747 .invoke_with_args(args)
748 .and_then(|x| x.to_array(3))
749 .unwrap();
750 let vector = result.as_primitive::<Float64Type>();
751
752 assert_eq!(3, vector.len());
753 for (i, gt) in results.iter().enumerate() {
754 let result = vector.is_valid(i).then(|| vector.value(i));
755 assert_eq!(*gt, result);
756 }
757 }
758
759 #[test]
760 fn test_json_get_bool() {
761 let json_get_bool = JsonGetBool::default();
762
763 assert_eq!("json_get_bool", json_get_bool.name());
764 assert_eq!(
765 DataType::Boolean,
766 json_get_bool
767 .return_type(&[DataType::Binary, DataType::Utf8])
768 .unwrap()
769 );
770
771 let json_strings = [
772 r#"{"a": {"b": true}, "b": false, "c": true}"#,
773 r#"{"a": false, "b": {"c": true}, "c": false}"#,
774 r#"{"a": true, "b": false, "c": {"a": true}}"#,
775 ];
776 let paths = vec!["$.a.b", "$.a", "$.c"];
777 let results = [Some(true), Some(false), None];
778
779 let jsonbs = json_strings
780 .iter()
781 .map(|s| {
782 let value = jsonb::parse_value(s.as_bytes()).unwrap();
783 value.to_vec()
784 })
785 .collect::<Vec<_>>();
786
787 let args = ScalarFunctionArgs {
788 args: vec![
789 ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
790 ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
791 ],
792 arg_fields: vec![],
793 number_rows: 3,
794 return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
795 config_options: Arc::new(Default::default()),
796 };
797 let result = json_get_bool
798 .invoke_with_args(args)
799 .and_then(|x| x.to_array(3))
800 .unwrap();
801 let vector = result.as_boolean();
802
803 assert_eq!(3, vector.len());
804 for (i, gt) in results.iter().enumerate() {
805 let result = vector.is_valid(i).then(|| vector.value(i));
806 assert_eq!(*gt, result);
807 }
808 }
809
810 #[test]
811 fn test_json_get_string() {
812 let json_get_string = JsonGetString::default();
813
814 assert_eq!("json_get_string", json_get_string.name());
815 assert_eq!(
816 DataType::Utf8View,
817 json_get_string
818 .return_type(&[DataType::Binary, DataType::Utf8])
819 .unwrap()
820 );
821
822 let json_strings = [
823 r#"{"a": {"b": "a"}, "b": "b", "c": "c"}"#,
824 r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
825 r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#,
826 ];
827 let json_struct = test_json_struct();
828
829 let paths = vec![
830 "$.a.b",
831 "$.a",
832 "",
833 "$.kind",
834 "$.payload.code",
835 "$.payload.result.time_cost",
836 "$.payload",
837 "$.payload.success",
838 "$.payload.result",
839 "$.payload.result.error",
840 "$.payload.result.not-exists",
841 "$.payload.not-exists",
842 "$.not-exists",
843 "$",
844 ];
845 let expects = [
846 Some("a"),
847 Some("d"),
848 None,
849 Some("foo"),
850 Some("404"),
851 Some("1.234"),
852 Some(
853 r#"{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}"#,
854 ),
855 Some("false"),
856 Some(r#"{"error":"not found","time_cost":1.234}"#),
857 Some("not found"),
858 None,
859 None,
860 None,
861 Some(
862 r#"{"kind":"foo","payload":{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}}"#,
863 ),
864 ];
865
866 let mut jsons = json_strings
867 .iter()
868 .map(|s| {
869 let value = jsonb::parse_value(s.as_bytes()).unwrap();
870 Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
871 })
872 .collect::<Vec<_>>();
873 let json_struct_arrays =
874 std::iter::repeat_n(json_struct, expects.len() - jsons.len()).collect::<Vec<_>>();
875 jsons.extend(json_struct_arrays);
876
877 for i in 0..jsons.len() {
878 let json = &jsons[i];
879 let path = paths[i];
880 let expect = expects[i];
881
882 let args = ScalarFunctionArgs {
883 args: vec![
884 ColumnarValue::Array(json.clone()),
885 ColumnarValue::Scalar(path.into()),
886 ],
887 arg_fields: vec![],
888 number_rows: 1,
889 return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
890 config_options: Arc::new(Default::default()),
891 };
892 let result = json_get_string
893 .invoke_with_args(args)
894 .and_then(|x| x.to_array(1))
895 .unwrap();
896
897 let result = result.as_string_view();
898 assert_eq!(1, result.len());
899 let actual = result.is_valid(0).then(|| result.value(0));
900 assert_eq!(actual, expect);
901 }
902 }
903
904 #[test]
905 fn test_json_get_object() -> Result<()> {
906 let udf = JsonGetObject::default();
907 assert_eq!("json_get_object", udf.name());
908 assert_eq!(
909 DataType::BinaryView,
910 udf.return_type(&[DataType::BinaryView, DataType::Utf8View])?
911 );
912
913 let json_value = parse_string_to_jsonb(r#"{"a": {"b": {"c": {"d": 1}}}}"#).unwrap();
914 let paths = vec!["$", "$.a", "$.a.b", "$.a.b.c", "$.a.b.c.d", "$.e", "$.a.e"];
915 let number_rows = paths.len();
916
917 let args = ScalarFunctionArgs {
918 args: vec![
919 ColumnarValue::Scalar(ScalarValue::Binary(Some(json_value))),
920 ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
921 ],
922 arg_fields: vec![],
923 number_rows,
924 return_field: Arc::new(Field::new("x", DataType::Binary, false)),
925 config_options: Arc::new(Default::default()),
926 };
927 let result = udf
928 .invoke_with_args(args)
929 .and_then(|x| x.to_array(number_rows))?;
930 let result = result.as_binary_view();
931
932 let expected = &BinaryViewArray::from_iter(
933 vec![
934 Some(r#"{"a": {"b": {"c": {"d": 1}}}}"#),
935 Some(r#"{"b": {"c": {"d": 1}}}"#),
936 Some(r#"{"c": {"d": 1}}"#),
937 Some(r#"{"d": 1}"#),
938 None,
939 None,
940 None,
941 ]
942 .into_iter()
943 .map(|x| x.and_then(|s| parse_string_to_jsonb(s).ok())),
944 );
945 assert_eq!(result, expected);
946 Ok(())
947 }
948}