1use std::str::FromStr;
16use std::sync::Arc;
17
18use arrow::array::{ArrayRef, BinaryViewArray, StringViewArray, StructArray};
19use arrow::compute;
20use arrow::datatypes::{Float64Type, Int64Type, UInt64Type};
21use arrow_schema::Field;
22use datafusion_common::arrow::array::{
23 Array, AsArray, BinaryViewBuilder, BooleanBuilder, Float64Builder, Int64Builder,
24 StringViewBuilder,
25};
26use datafusion_common::arrow::datatypes::DataType;
27use datafusion_common::{DataFusionError, Result};
28use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
29use datatypes::arrow_array::{int_array_value_at_index, string_array_value_at_index};
30use datatypes::json::JsonStructureSettings;
31use derive_more::Display;
32use jsonpath_rust::JsonPath;
33use serde_json::Value;
34
35use crate::function::{Function, extract_args};
36use crate::helper;
37
38fn get_json_by_path(json: &[u8], path: &str) -> Option<Vec<u8>> {
39 let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes());
40 match json_path {
41 Ok(json_path) => {
42 let mut sub_jsonb = Vec::new();
43 let mut sub_offsets = Vec::new();
44 match jsonb::get_by_path(json, json_path, &mut sub_jsonb, &mut sub_offsets) {
45 Ok(_) => Some(sub_jsonb),
46 Err(_) => None,
47 }
48 }
49 _ => None,
50 }
51}
52
53enum JsonResultValue<'a> {
54 Jsonb(Vec<u8>),
55 JsonStructByColumn(&'a ArrayRef, usize),
56 JsonStructByValue(&'a Value),
57}
58
59trait JsonGetResultBuilder {
60 fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()>;
61
62 fn append_null(&mut self);
63
64 fn build(&mut self) -> ArrayRef;
65}
66
67#[derive(Debug)]
74struct JsonGet {
75 signature: Signature,
76}
77
78impl JsonGet {
79 fn invoke<F, B>(&self, args: ScalarFunctionArgs, builder_factory: F) -> Result<ColumnarValue>
80 where
81 F: Fn(usize) -> B,
82 B: JsonGetResultBuilder,
83 {
84 let [arg0, arg1] = extract_args("JSON_GET", &args)?;
85
86 let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
87 let paths = arg1.as_string_view();
88
89 let mut builder = (builder_factory)(arg0.len());
90 match arg0.data_type() {
91 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
92 let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
93 let jsons = arg0.as_binary_view();
94 jsonb_get(jsons, paths, &mut builder)?;
95 }
96 DataType::Struct(_) => {
97 let jsons = arg0.as_struct();
98 json_struct_get(jsons, paths, &mut builder)?
99 }
100 _ => {
101 return Err(DataFusionError::Execution(format!(
102 "JSON_GET not supported argument type {}",
103 arg0.data_type(),
104 )));
105 }
106 };
107
108 Ok(ColumnarValue::Array(builder.build()))
109 }
110}
111
112impl Default for JsonGet {
113 fn default() -> Self {
114 Self {
115 signature: Signature::any(2, Volatility::Immutable),
116 }
117 }
118}
119
120struct StringResultBuilder(StringViewBuilder);
122
123impl JsonGetResultBuilder for StringResultBuilder {
124 fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
125 match value {
126 JsonResultValue::Jsonb(value) => self.0.append_option(jsonb::to_str(&value).ok()),
127 JsonResultValue::JsonStructByColumn(column, i) => {
128 if let Some(v) = string_array_value_at_index(column, i) {
129 self.0.append_value(v);
130 } else {
131 self.0
132 .append_value(arrow_cast::display::array_value_to_string(column, i)?);
133 }
134 }
135 JsonResultValue::JsonStructByValue(value) => {
136 if let Some(s) = value.as_str() {
137 self.0.append_value(s)
138 } else {
139 self.0.append_value(value.to_string())
140 }
141 }
142 }
143 Ok(())
144 }
145
146 fn append_null(&mut self) {
147 self.0.append_null();
148 }
149
150 fn build(&mut self) -> ArrayRef {
151 Arc::new(self.0.finish())
152 }
153}
154
155#[derive(Default, Display, Debug)]
156#[display("{}", Self::NAME.to_ascii_uppercase())]
157pub struct JsonGetString(JsonGet);
158
159impl JsonGetString {
160 pub const NAME: &'static str = "json_get_string";
161}
162
163impl Function for JsonGetString {
164 fn name(&self) -> &str {
165 Self::NAME
166 }
167
168 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
169 Ok(DataType::Utf8View)
170 }
171
172 fn signature(&self) -> &Signature {
173 &self.0.signature
174 }
175
176 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
177 self.0.invoke(args, |len: usize| {
178 StringResultBuilder(StringViewBuilder::with_capacity(len))
179 })
180 }
181}
182
183struct IntResultBuilder(Int64Builder);
184
185impl JsonGetResultBuilder for IntResultBuilder {
186 fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
187 match value {
188 JsonResultValue::Jsonb(value) => self.0.append_option(jsonb::to_i64(&value).ok()),
189 JsonResultValue::JsonStructByColumn(column, i) => {
190 self.0.append_option(int_array_value_at_index(column, i))
191 }
192 JsonResultValue::JsonStructByValue(value) => self.0.append_option(value.as_i64()),
193 }
194 Ok(())
195 }
196
197 fn append_null(&mut self) {
198 self.0.append_null();
199 }
200
201 fn build(&mut self) -> ArrayRef {
202 Arc::new(self.0.finish())
203 }
204}
205
206#[derive(Default, Display, Debug)]
207#[display("{}", Self::NAME.to_ascii_uppercase())]
208pub struct JsonGetInt(JsonGet);
209
210impl JsonGetInt {
211 pub const NAME: &'static str = "json_get_int";
212}
213
214impl Function for JsonGetInt {
215 fn name(&self) -> &str {
216 Self::NAME
217 }
218
219 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
220 Ok(DataType::Int64)
221 }
222
223 fn signature(&self) -> &Signature {
224 &self.0.signature
225 }
226
227 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
228 self.0.invoke(args, |len: usize| {
229 IntResultBuilder(Int64Builder::with_capacity(len))
230 })
231 }
232}
233
234struct FloatResultBuilder(Float64Builder);
235
236impl JsonGetResultBuilder for FloatResultBuilder {
237 fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
238 match value {
239 JsonResultValue::Jsonb(value) => self.0.append_option(jsonb::to_f64(&value).ok()),
240 JsonResultValue::JsonStructByColumn(column, i) => {
241 let result = if column.data_type() == &DataType::Float64 {
242 column
243 .as_primitive::<Float64Type>()
244 .is_valid(i)
245 .then(|| column.as_primitive::<Float64Type>().value(i))
246 } else {
247 None
248 };
249 self.0.append_option(result);
250 }
251 JsonResultValue::JsonStructByValue(value) => self.0.append_option(value.as_f64()),
252 }
253 Ok(())
254 }
255
256 fn append_null(&mut self) {
257 self.0.append_null();
258 }
259
260 fn build(&mut self) -> ArrayRef {
261 Arc::new(self.0.finish())
262 }
263}
264
265#[derive(Default, Display, Debug)]
266#[display("{}", Self::NAME.to_ascii_uppercase())]
267pub struct JsonGetFloat(JsonGet);
268
269impl JsonGetFloat {
270 pub const NAME: &'static str = "json_get_float";
271}
272
273impl Function for JsonGetFloat {
274 fn name(&self) -> &str {
275 Self::NAME
276 }
277
278 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
279 Ok(DataType::Float64)
280 }
281
282 fn signature(&self) -> &Signature {
283 &self.0.signature
284 }
285
286 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
287 self.0.invoke(args, |len: usize| {
288 FloatResultBuilder(Float64Builder::with_capacity(len))
289 })
290 }
291}
292
293struct BoolResultBuilder(BooleanBuilder);
294
295impl JsonGetResultBuilder for BoolResultBuilder {
296 fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
297 match value {
298 JsonResultValue::Jsonb(value) => self.0.append_option(jsonb::to_bool(&value).ok()),
299 JsonResultValue::JsonStructByColumn(column, i) => {
300 let result = if column.data_type() == &DataType::Boolean {
301 column
302 .as_boolean()
303 .is_valid(i)
304 .then(|| column.as_boolean().value(i))
305 } else {
306 None
307 };
308 self.0.append_option(result);
309 }
310 JsonResultValue::JsonStructByValue(value) => self.0.append_option(value.as_bool()),
311 }
312 Ok(())
313 }
314
315 fn append_null(&mut self) {
316 self.0.append_null();
317 }
318
319 fn build(&mut self) -> ArrayRef {
320 Arc::new(self.0.finish())
321 }
322}
323
324#[derive(Default, Display, Debug)]
325#[display("{}", Self::NAME.to_ascii_uppercase())]
326pub struct JsonGetBool(JsonGet);
327
328impl JsonGetBool {
329 pub const NAME: &'static str = "json_get_bool";
330}
331
332impl Function for JsonGetBool {
333 fn name(&self) -> &str {
334 Self::NAME
335 }
336
337 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
338 Ok(DataType::Boolean)
339 }
340
341 fn signature(&self) -> &Signature {
342 &self.0.signature
343 }
344
345 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
346 self.0.invoke(args, |len: usize| {
347 BoolResultBuilder(BooleanBuilder::with_capacity(len))
348 })
349 }
350}
351
352fn jsonb_get(
353 jsons: &BinaryViewArray,
354 paths: &StringViewArray,
355 builder: &mut dyn JsonGetResultBuilder,
356) -> Result<()> {
357 let size = jsons.len();
358 for i in 0..size {
359 let json = jsons.is_valid(i).then(|| jsons.value(i));
360 let path = paths.is_valid(i).then(|| paths.value(i));
361 let result = match (json, path) {
362 (Some(json), Some(path)) => get_json_by_path(json, path),
363 _ => None,
364 };
365 if let Some(v) = result {
366 builder.append_value(JsonResultValue::Jsonb(v))?;
367 } else {
368 builder.append_null();
369 }
370 }
371 Ok(())
372}
373
374fn json_struct_get(
375 jsons: &StructArray,
376 paths: &StringViewArray,
377 builder: &mut dyn JsonGetResultBuilder,
378) -> Result<()> {
379 let size = jsons.len();
380 for i in 0..size {
381 if jsons.is_null(i) || paths.is_null(i) {
382 builder.append_null();
383 continue;
384 }
385 let path = paths.value(i);
386
387 let field_path = path.trim().replace("$.", "");
389 let column = jsons.column_by_name(&field_path);
390
391 if let Some(column) = column {
392 builder.append_value(JsonResultValue::JsonStructByColumn(column, i))?;
393 } else {
394 let Some(raw) = jsons
395 .column_by_name(JsonStructureSettings::RAW_FIELD)
396 .and_then(|x| string_array_value_at_index(x, i))
397 else {
398 builder.append_null();
399 continue;
400 };
401
402 let path: JsonPath<Value> = JsonPath::try_from(path).map_err(|e| {
403 DataFusionError::Execution(format!("{path} is not a valid JSON path: {e}"))
404 })?;
405 let value = json_struct_to_value(raw, jsons, i)?;
408
409 match path.find(&value) {
410 Value::Null => builder.append_null(),
411 Value::Array(values) => match values.as_slice() {
412 [] => builder.append_null(),
413 [x] => builder.append_value(JsonResultValue::JsonStructByValue(x))?,
414 _ => builder.append_value(JsonResultValue::JsonStructByValue(&value))?,
415 },
416 value => builder.append_value(JsonResultValue::JsonStructByValue(&value))?,
417 }
418 }
419 }
420
421 Ok(())
422}
423
424fn json_struct_to_value(raw: &str, jsons: &StructArray, i: usize) -> Result<Value> {
425 let Ok(mut json) = Value::from_str(raw) else {
426 return Err(DataFusionError::Internal(format!(
427 "inner field '{}' is not a valid JSON string",
428 JsonStructureSettings::RAW_FIELD
429 )));
430 };
431
432 for (column_name, column) in jsons.column_names().into_iter().zip(jsons.columns()) {
433 if column_name == JsonStructureSettings::RAW_FIELD {
434 continue;
435 }
436
437 let (json_pointer, field) = if let Some((json_object, field)) = column_name.rsplit_once(".")
438 {
439 let json_pointer = format!("/{}", json_object.replace(".", "/"));
440 (json_pointer, field)
441 } else {
442 ("".to_string(), column_name)
443 };
444 let Some(json_object) = json
445 .pointer_mut(&json_pointer)
446 .and_then(|x| x.as_object_mut())
447 else {
448 return Err(DataFusionError::Internal(format!(
449 "value at JSON pointer '{}' is not an object",
450 json_pointer
451 )));
452 };
453
454 macro_rules! insert {
455 ($column: ident, $i: ident, $json_object: ident, $field: ident) => {{
456 if let Some(value) = $column
457 .is_valid($i)
458 .then(|| serde_json::Value::from($column.value($i)))
459 {
460 $json_object.insert($field.to_string(), value);
461 }
462 }};
463 }
464
465 match column.data_type() {
466 DataType::Boolean => {
468 let column = column.as_boolean();
469 insert!(column, i, json_object, field);
470 }
471 DataType::Int64 => {
473 let column = column.as_primitive::<Int64Type>();
474 insert!(column, i, json_object, field);
475 }
476 DataType::UInt64 => {
477 let column = column.as_primitive::<UInt64Type>();
478 insert!(column, i, json_object, field);
479 }
480 DataType::Float64 => {
481 let column = column.as_primitive::<Float64Type>();
482 insert!(column, i, json_object, field);
483 }
484 DataType::Utf8 => {
486 let column = column.as_string::<i32>();
487 insert!(column, i, json_object, field);
488 }
489 DataType::LargeUtf8 => {
490 let column = column.as_string::<i64>();
491 insert!(column, i, json_object, field);
492 }
493 DataType::Utf8View => {
494 let column = column.as_string_view();
495 insert!(column, i, json_object, field);
496 }
497 _ => {
499 return Err(DataFusionError::NotImplemented(format!(
500 "{} is not yet supported to be executed with field {} of datatype {}",
501 JsonGetString::NAME,
502 column_name,
503 column.data_type()
504 )));
505 }
506 }
507 }
508 Ok(json)
509}
510
511#[derive(Debug, Display)]
515#[display("{}", Self::NAME.to_ascii_uppercase())]
516pub(super) struct JsonGetWithType {
517 signature: Signature,
518}
519
520impl JsonGetWithType {
521 pub(crate) const NAME: &'static str = "json_get";
522}
523
524impl Default for JsonGetWithType {
525 fn default() -> Self {
526 Self {
527 signature: Signature::variadic_any(Volatility::Immutable),
528 }
529 }
530}
531
532impl Function for JsonGetWithType {
533 fn name(&self) -> &str {
534 Self::NAME
535 }
536
537 fn return_type(&self, _input_types: &[DataType]) -> datafusion_common::Result<DataType> {
538 Err(DataFusionError::Internal(
539 "This method isn't meant to be called".to_string(),
540 ))
541 }
542
543 fn return_field_from_args(
544 &self,
545 args: datafusion_expr::ReturnFieldArgs<'_>,
546 ) -> datafusion_common::Result<Arc<Field>> {
547 match args.scalar_arguments.get(2) {
548 Some(Some(v)) => {
549 let mut data_type = v.data_type();
550 if matches!(data_type, DataType::Utf8 | DataType::LargeUtf8) {
551 data_type = DataType::Utf8View;
552 }
553
554 Ok(Arc::new(Field::new(self.name(), data_type, true)))
555 }
556 _ => Ok(Arc::new(Field::new(self.name(), DataType::Utf8View, true))),
557 }
558 }
559
560 fn signature(&self) -> &Signature {
561 &self.signature
562 }
563
564 fn invoke_with_args(
565 &self,
566 args: ScalarFunctionArgs,
567 ) -> datafusion_common::Result<ColumnarValue> {
568 let [arg0, arg1, _] = extract_args(self.name(), &args)?;
569 let len = arg0.len();
570
571 let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
572 let paths = arg1.as_string_view();
573
574 let mut builder: Box<dyn JsonGetResultBuilder> = match args.return_field.data_type() {
576 DataType::Utf8View => {
577 Box::new(StringResultBuilder(StringViewBuilder::with_capacity(len)))
578 }
579 DataType::Int64 => Box::new(IntResultBuilder(Int64Builder::with_capacity(len))),
580 DataType::Float64 => Box::new(FloatResultBuilder(Float64Builder::with_capacity(len))),
581 DataType::Boolean => Box::new(BoolResultBuilder(BooleanBuilder::with_capacity(len))),
582 _type => {
583 return Err(DataFusionError::Internal(format!(
584 "Unsupported return type {}",
585 _type
586 )));
587 }
588 };
589
590 match arg0.data_type() {
591 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
592 let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
593 let jsons = arg0.as_binary_view();
594 jsonb_get(jsons, paths, builder.as_mut())?;
595 }
596 DataType::Struct(_) => {
597 let jsons = arg0.as_struct();
598 json_struct_get(jsons, paths, builder.as_mut())?;
599 }
600 _ => {
601 return Err(DataFusionError::Execution(format!(
602 "JSON_GET not supported argument type {}",
603 arg0.data_type(),
604 )));
605 }
606 };
607
608 Ok(ColumnarValue::Array(builder.build()))
609 }
610}
611
612#[derive(Display, Debug)]
614#[display("{}", Self::NAME.to_ascii_uppercase())]
615pub(super) struct JsonGetObject {
616 signature: Signature,
617}
618
619impl JsonGetObject {
620 const NAME: &'static str = "json_get_object";
621}
622
623impl Default for JsonGetObject {
624 fn default() -> Self {
625 Self {
626 signature: helper::one_of_sigs2(
627 vec![
628 DataType::Binary,
629 DataType::LargeBinary,
630 DataType::BinaryView,
631 ],
632 vec![DataType::UInt8, DataType::LargeUtf8, DataType::Utf8View],
633 ),
634 }
635 }
636}
637
638impl Function for JsonGetObject {
639 fn name(&self) -> &str {
640 Self::NAME
641 }
642
643 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
644 Ok(DataType::BinaryView)
645 }
646
647 fn signature(&self) -> &Signature {
648 &self.signature
649 }
650
651 fn invoke_with_args(
652 &self,
653 args: ScalarFunctionArgs,
654 ) -> datafusion_common::Result<ColumnarValue> {
655 let [arg0, arg1] = extract_args(self.name(), &args)?;
656 let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
657 let jsons = arg0.as_binary_view();
658 let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
659 let paths = arg1.as_string_view();
660
661 let len = jsons.len();
662 let mut builder = BinaryViewBuilder::with_capacity(len);
663
664 for i in 0..len {
665 let json = jsons.is_valid(i).then(|| jsons.value(i));
666 let path = paths.is_valid(i).then(|| paths.value(i));
667 let result = if let (Some(json), Some(path)) = (json, path) {
668 let result = jsonb::jsonpath::parse_json_path(path.as_bytes()).and_then(|path| {
669 let mut data = Vec::new();
670 let mut offset = Vec::new();
671 jsonb::get_by_path(json, path, &mut data, &mut offset)
672 .map(|()| jsonb::is_object(&data).then_some(data))
673 });
674 result.map_err(|e| DataFusionError::Execution(e.to_string()))?
675 } else {
676 None
677 };
678 builder.append_option(result);
679 }
680
681 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
682 }
683}
684
685#[cfg(test)]
686mod tests {
687 use std::sync::Arc;
688
689 use arrow::array::{Float64Array, Int64Array, StructArray};
690 use arrow_schema::Field;
691 use datafusion_common::ScalarValue;
692 use datafusion_common::arrow::array::{BinaryArray, BinaryViewArray, StringArray};
693 use datafusion_common::arrow::datatypes::{Float64Type, Int64Type};
694 use datatypes::types::parse_string_to_jsonb;
695 use serde_json::json;
696
697 use super::*;
698
699 fn test_json_struct() -> ArrayRef {
715 Arc::new(StructArray::new(
716 vec![
717 Field::new("kind", DataType::Utf8, true),
718 Field::new("payload.code", DataType::Int64, true),
719 Field::new("payload.result.time_cost", DataType::Float64, true),
720 Field::new(JsonStructureSettings::RAW_FIELD, DataType::Utf8View, true),
721 ]
722 .into(),
723 vec![
724 Arc::new(StringArray::from_iter([Some("foo")])) as ArrayRef,
725 Arc::new(Int64Array::from_iter([Some(404)])),
726 Arc::new(Float64Array::from_iter([Some(1.234)])),
727 Arc::new(StringViewArray::from_iter([Some(
728 json! ({
729 "payload": {
730 "success": false,
731 "result": {
732 "error": "not found"
733 }
734 }
735 })
736 .to_string(),
737 )])),
738 ],
739 None,
740 ))
741 }
742
743 #[test]
744 fn test_json_get_int() {
745 let json_get_int = JsonGetInt::default();
746
747 assert_eq!("json_get_int", json_get_int.name());
748 assert_eq!(
749 DataType::Int64,
750 json_get_int
751 .return_type(&[DataType::Binary, DataType::Utf8])
752 .unwrap()
753 );
754
755 let json_strings = [
756 r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
757 r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
758 r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
759 ];
760 let json_struct = test_json_struct();
761
762 let path_expects = vec![
763 ("$.a.b", Some(2)),
764 ("$.a", Some(4)),
765 ("$.c", None),
766 ("$.kind", None),
767 ("$.payload.code", Some(404)),
768 ("$.payload.success", None),
769 ("$.payload.result.time_cost", None),
770 ("$.payload.not-exists", None),
771 ("$.not-exists", None),
772 ("$", None),
773 ];
774
775 let mut jsons = json_strings
776 .iter()
777 .map(|s| {
778 let value = jsonb::parse_value(s.as_bytes()).unwrap();
779 Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
780 })
781 .collect::<Vec<_>>();
782 let json_struct_arrays =
783 std::iter::repeat_n(json_struct, path_expects.len() - jsons.len()).collect::<Vec<_>>();
784 jsons.extend(json_struct_arrays);
785
786 for i in 0..jsons.len() {
787 let json = &jsons[i];
788 let (path, expect) = path_expects[i];
789
790 let args = ScalarFunctionArgs {
791 args: vec![
792 ColumnarValue::Array(json.clone()),
793 ColumnarValue::Scalar(path.into()),
794 ],
795 arg_fields: vec![],
796 number_rows: 1,
797 return_field: Arc::new(Field::new("x", DataType::Int64, false)),
798 config_options: Arc::new(Default::default()),
799 };
800 let result = json_get_int
801 .invoke_with_args(args)
802 .and_then(|x| x.to_array(1))
803 .unwrap();
804
805 let result = result.as_primitive::<Int64Type>();
806 assert_eq!(1, result.len());
807 let actual = result.is_valid(0).then(|| result.value(0));
808 assert_eq!(actual, expect);
809 }
810 }
811
812 #[test]
813 fn test_json_get_float() {
814 let json_get_float = JsonGetFloat::default();
815
816 assert_eq!("json_get_float", json_get_float.name());
817 assert_eq!(
818 DataType::Float64,
819 json_get_float
820 .return_type(&[DataType::Binary, DataType::Utf8])
821 .unwrap()
822 );
823
824 let json_strings = [
825 r#"{"a": {"b": 2.1}, "b": 2.2, "c": 3.3}"#,
826 r#"{"a": 4.4, "b": {"c": 6.6}, "c": 6.6}"#,
827 r#"{"a": 7.7, "b": 8.8, "c": {"a": 7.7}}"#,
828 ];
829 let json_struct = test_json_struct();
830
831 let path_expects = vec![
832 ("$.a.b", Some(2.1)),
833 ("$.a", Some(4.4)),
834 ("$.c", None),
835 ("$.kind", None),
836 ("$.payload.code", None),
837 ("$.payload.success", None),
838 ("$.payload.result.time_cost", Some(1.234)),
839 ("$.payload.not-exists", None),
840 ("$.not-exists", None),
841 ("$", None),
842 ];
843
844 let mut jsons = json_strings
845 .iter()
846 .map(|s| {
847 let value = jsonb::parse_value(s.as_bytes()).unwrap();
848 Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
849 })
850 .collect::<Vec<_>>();
851 let json_struct_arrays =
852 std::iter::repeat_n(json_struct, path_expects.len() - jsons.len()).collect::<Vec<_>>();
853 jsons.extend(json_struct_arrays);
854
855 for i in 0..jsons.len() {
856 let json = &jsons[i];
857 let (path, expect) = path_expects[i];
858
859 let args = ScalarFunctionArgs {
860 args: vec![
861 ColumnarValue::Array(json.clone()),
862 ColumnarValue::Scalar(path.into()),
863 ],
864 arg_fields: vec![],
865 number_rows: 1,
866 return_field: Arc::new(Field::new("x", DataType::Float64, false)),
867 config_options: Arc::new(Default::default()),
868 };
869 let result = json_get_float
870 .invoke_with_args(args)
871 .and_then(|x| x.to_array(1))
872 .unwrap();
873
874 let result = result.as_primitive::<Float64Type>();
875 assert_eq!(1, result.len());
876 let actual = result.is_valid(0).then(|| result.value(0));
877 assert_eq!(actual, expect);
878 }
879 }
880
881 #[test]
882 fn test_json_get_bool() {
883 let json_get_bool = JsonGetBool::default();
884
885 assert_eq!("json_get_bool", json_get_bool.name());
886 assert_eq!(
887 DataType::Boolean,
888 json_get_bool
889 .return_type(&[DataType::Binary, DataType::Utf8])
890 .unwrap()
891 );
892
893 let json_strings = [
894 r#"{"a": {"b": true}, "b": false, "c": true}"#,
895 r#"{"a": false, "b": {"c": true}, "c": false}"#,
896 r#"{"a": true, "b": false, "c": {"a": true}}"#,
897 ];
898 let json_struct = test_json_struct();
899
900 let path_expects = vec![
901 ("$.a.b", Some(true)),
902 ("$.a", Some(false)),
903 ("$.c", None),
904 ("$.kind", None),
905 ("$.payload.code", None),
906 ("$.payload.success", Some(false)),
907 ("$.payload.result.time_cost", None),
908 ("$.payload.not-exists", None),
909 ("$.not-exists", None),
910 ("$", None),
911 ];
912
913 let mut jsons = json_strings
914 .iter()
915 .map(|s| {
916 let value = jsonb::parse_value(s.as_bytes()).unwrap();
917 Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
918 })
919 .collect::<Vec<_>>();
920 let json_struct_arrays =
921 std::iter::repeat_n(json_struct, path_expects.len() - jsons.len()).collect::<Vec<_>>();
922 jsons.extend(json_struct_arrays);
923
924 for i in 0..jsons.len() {
925 let json = &jsons[i];
926 let (path, expect) = path_expects[i];
927
928 let args = ScalarFunctionArgs {
929 args: vec![
930 ColumnarValue::Array(json.clone()),
931 ColumnarValue::Scalar(path.into()),
932 ],
933 arg_fields: vec![],
934 number_rows: 1,
935 return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
936 config_options: Arc::new(Default::default()),
937 };
938 let result = json_get_bool
939 .invoke_with_args(args)
940 .and_then(|x| x.to_array(1))
941 .unwrap();
942
943 let result = result.as_boolean();
944 assert_eq!(1, result.len());
945 let actual = result.is_valid(0).then(|| result.value(0));
946 assert_eq!(actual, expect);
947 }
948 }
949
950 #[test]
951 fn test_json_get_string() {
952 let json_get_string = JsonGetString::default();
953
954 assert_eq!("json_get_string", json_get_string.name());
955 assert_eq!(
956 DataType::Utf8View,
957 json_get_string
958 .return_type(&[DataType::Binary, DataType::Utf8])
959 .unwrap()
960 );
961
962 let json_strings = [
963 r#"{"a": {"b": "a"}, "b": "b", "c": "c"}"#,
964 r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
965 r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#,
966 ];
967 let json_struct = test_json_struct();
968
969 let paths = vec![
970 "$.a.b",
971 "$.a",
972 "",
973 "$.kind",
974 "$.payload.code",
975 "$.payload.result.time_cost",
976 "$.payload",
977 "$.payload.success",
978 "$.payload.result",
979 "$.payload.result.error",
980 "$.payload.result.not-exists",
981 "$.payload.not-exists",
982 "$.not-exists",
983 "$",
984 ];
985 let expects = [
986 Some("a"),
987 Some("d"),
988 None,
989 Some("foo"),
990 Some("404"),
991 Some("1.234"),
992 Some(
993 r#"{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}"#,
994 ),
995 Some("false"),
996 Some(r#"{"error":"not found","time_cost":1.234}"#),
997 Some("not found"),
998 None,
999 None,
1000 None,
1001 Some(
1002 r#"{"kind":"foo","payload":{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}}"#,
1003 ),
1004 ];
1005
1006 let mut jsons = json_strings
1007 .iter()
1008 .map(|s| {
1009 let value = jsonb::parse_value(s.as_bytes()).unwrap();
1010 Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
1011 })
1012 .collect::<Vec<_>>();
1013 let json_struct_arrays =
1014 std::iter::repeat_n(json_struct, expects.len() - jsons.len()).collect::<Vec<_>>();
1015 jsons.extend(json_struct_arrays);
1016
1017 for i in 0..jsons.len() {
1018 let json = &jsons[i];
1019 let path = paths[i];
1020 let expect = expects[i];
1021
1022 let args = ScalarFunctionArgs {
1023 args: vec![
1024 ColumnarValue::Array(json.clone()),
1025 ColumnarValue::Scalar(path.into()),
1026 ],
1027 arg_fields: vec![],
1028 number_rows: 1,
1029 return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
1030 config_options: Arc::new(Default::default()),
1031 };
1032 let result = json_get_string
1033 .invoke_with_args(args)
1034 .and_then(|x| x.to_array(1))
1035 .unwrap();
1036
1037 let result = result.as_string_view();
1038 assert_eq!(1, result.len());
1039 let actual = result.is_valid(0).then(|| result.value(0));
1040 assert_eq!(actual, expect);
1041 }
1042 }
1043
1044 #[test]
1045 fn test_json_get_object() -> Result<()> {
1046 let udf = JsonGetObject::default();
1047 assert_eq!("json_get_object", udf.name());
1048 assert_eq!(
1049 DataType::BinaryView,
1050 udf.return_type(&[DataType::BinaryView, DataType::Utf8View])?
1051 );
1052
1053 let json_value = parse_string_to_jsonb(r#"{"a": {"b": {"c": {"d": 1}}}}"#).unwrap();
1054 let paths = vec!["$", "$.a", "$.a.b", "$.a.b.c", "$.a.b.c.d", "$.e", "$.a.e"];
1055 let number_rows = paths.len();
1056
1057 let args = ScalarFunctionArgs {
1058 args: vec![
1059 ColumnarValue::Scalar(ScalarValue::Binary(Some(json_value))),
1060 ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
1061 ],
1062 arg_fields: vec![],
1063 number_rows,
1064 return_field: Arc::new(Field::new("x", DataType::Binary, false)),
1065 config_options: Arc::new(Default::default()),
1066 };
1067 let result = udf
1068 .invoke_with_args(args)
1069 .and_then(|x| x.to_array(number_rows))?;
1070 let result = result.as_binary_view();
1071
1072 let expected = &BinaryViewArray::from_iter(
1073 vec![
1074 Some(r#"{"a": {"b": {"c": {"d": 1}}}}"#),
1075 Some(r#"{"b": {"c": {"d": 1}}}"#),
1076 Some(r#"{"c": {"d": 1}}"#),
1077 Some(r#"{"d": 1}"#),
1078 None,
1079 None,
1080 None,
1081 ]
1082 .into_iter()
1083 .map(|x| x.and_then(|s| parse_string_to_jsonb(s).ok())),
1084 );
1085 assert_eq!(result, expected);
1086 Ok(())
1087 }
1088
1089 #[test]
1090 fn test_json_get_with_type() {
1091 let json_get_with_type = JsonGetWithType::default();
1092
1093 assert_eq!("json_get", json_get_with_type.name());
1094
1095 let json_strings = [
1096 r#"{"a": {"b": "a"}, "b": "b", "c": "c"}"#,
1097 r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
1098 r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#,
1099 ];
1100 let json_struct = test_json_struct();
1101
1102 let paths = vec![
1103 "$.a.b",
1104 "$.a",
1105 "",
1106 "$.kind",
1107 "$.payload.code",
1108 "$.payload.result.time_cost",
1109 "$.payload",
1110 "$.payload.success",
1111 "$.payload.result",
1112 "$.payload.result.error",
1113 "$.payload.result.not-exists",
1114 "$.payload.not-exists",
1115 "$.not-exists",
1116 "$",
1117 ];
1118 let expects = [
1119 Some("a"),
1120 Some("d"),
1121 None,
1122 Some("foo"),
1123 Some("404"),
1124 Some("1.234"),
1125 Some(
1126 r#"{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}"#,
1127 ),
1128 Some("false"),
1129 Some(r#"{"error":"not found","time_cost":1.234}"#),
1130 Some("not found"),
1131 None,
1132 None,
1133 None,
1134 Some(
1135 r#"{"kind":"foo","payload":{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}}"#,
1136 ),
1137 ];
1138
1139 let mut jsons = json_strings
1140 .iter()
1141 .map(|s| {
1142 let value = jsonb::parse_value(s.as_bytes()).unwrap();
1143 Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
1144 })
1145 .collect::<Vec<_>>();
1146 let json_struct_arrays =
1147 std::iter::repeat_n(json_struct, expects.len() - jsons.len()).collect::<Vec<_>>();
1148 jsons.extend(json_struct_arrays);
1149
1150 for i in 0..jsons.len() {
1151 let json = &jsons[i];
1152 let path = paths[i];
1153 let expect = expects[i];
1154
1155 let args = ScalarFunctionArgs {
1156 args: vec![
1157 ColumnarValue::Array(json.clone()),
1158 ColumnarValue::Scalar(path.into()),
1159 ColumnarValue::Scalar(ScalarValue::Utf8(Some("string".to_string()))),
1160 ],
1161 arg_fields: vec![],
1162 number_rows: 1,
1163 return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
1164 config_options: Arc::new(Default::default()),
1165 };
1166 let result = json_get_with_type
1167 .invoke_with_args(args)
1168 .and_then(|x| x.to_array(1))
1169 .unwrap();
1170
1171 let result = result.as_string_view();
1172 assert_eq!(1, result.len());
1173 let actual = result.is_valid(0).then(|| result.value(0));
1174 assert_eq!(actual, expect);
1175 }
1176
1177 let json_strings = [
1178 r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
1179 r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
1180 r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
1181 ];
1182 let paths = ["$.a.b", "$.a", "$.c", "$.payload.code"];
1183 let expects = [Some(2), Some(4), None, Some(404)];
1184
1185 for (i, (path, expect)) in paths.iter().zip(expects.iter()).enumerate() {
1186 let json = if i < json_strings.len() {
1187 let value = jsonb::parse_value(json_strings[i].as_bytes()).unwrap();
1188 Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
1189 } else {
1190 test_json_struct()
1191 };
1192
1193 let args = ScalarFunctionArgs {
1194 args: vec![
1195 ColumnarValue::Array(json),
1196 ColumnarValue::Scalar((*path).into()),
1197 ColumnarValue::Scalar(ScalarValue::Utf8(Some("int".to_string()))),
1198 ],
1199 arg_fields: vec![],
1200 number_rows: 1,
1201 return_field: Arc::new(Field::new("x", DataType::Int64, false)),
1202 config_options: Arc::new(Default::default()),
1203 };
1204 let result = json_get_with_type
1205 .invoke_with_args(args)
1206 .and_then(|x| x.to_array(1))
1207 .unwrap();
1208
1209 let result = result.as_primitive::<Int64Type>();
1210 assert_eq!(1, result.len());
1211 let actual = result.is_valid(0).then(|| result.value(0));
1212 assert_eq!(actual, *expect);
1213 }
1214
1215 let json_strings = [
1216 r#"{"a": {"b": 2.1}, "b": 2.2, "c": 3.3}"#,
1217 r#"{"a": 4.4, "b": {"c": 6.6}, "c": 6.6}"#,
1218 r#"{"a": 7.7, "b": 8.8, "c": {"a": 7.7}}"#,
1219 ];
1220 let paths = ["$.a.b", "$.a", "$.c", "$.payload.result.time_cost"];
1221 let expects = [Some(2.1), Some(4.4), None, Some(1.234)];
1222
1223 for (i, (path, expect)) in paths.iter().zip(expects.iter()).enumerate() {
1224 let json = if i < json_strings.len() {
1225 let value = jsonb::parse_value(json_strings[i].as_bytes()).unwrap();
1226 Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
1227 } else {
1228 test_json_struct()
1229 };
1230
1231 let args = ScalarFunctionArgs {
1232 args: vec![
1233 ColumnarValue::Array(json),
1234 ColumnarValue::Scalar((*path).into()),
1235 ColumnarValue::Scalar(ScalarValue::Utf8(Some("float".to_string()))),
1236 ],
1237 arg_fields: vec![],
1238 number_rows: 1,
1239 return_field: Arc::new(Field::new("x", DataType::Float64, false)),
1240 config_options: Arc::new(Default::default()),
1241 };
1242 let result = json_get_with_type
1243 .invoke_with_args(args)
1244 .and_then(|x| x.to_array(1))
1245 .unwrap();
1246
1247 let result = result.as_primitive::<Float64Type>();
1248 assert_eq!(1, result.len());
1249 let actual = result.is_valid(0).then(|| result.value(0));
1250 assert_eq!(actual, *expect);
1251 }
1252
1253 let json_strings = [
1254 r#"{"a": {"b": true}, "b": false, "c": true}"#,
1255 r#"{"a": false, "b": {"c": true}, "c": false}"#,
1256 r#"{"a": true, "b": false, "c": {"a": true}}"#,
1257 ];
1258 let paths = ["$.a.b", "$.a", "$.c", "$.payload.success"];
1259 let expects = [Some(true), Some(false), None, Some(false)];
1260
1261 for (i, (path, expect)) in paths.iter().zip(expects.iter()).enumerate() {
1262 let json = if i < json_strings.len() {
1263 let value = jsonb::parse_value(json_strings[i].as_bytes()).unwrap();
1264 Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
1265 } else {
1266 test_json_struct()
1267 };
1268
1269 let args = ScalarFunctionArgs {
1270 args: vec![
1271 ColumnarValue::Array(json),
1272 ColumnarValue::Scalar((*path).into()),
1273 ColumnarValue::Scalar(ScalarValue::Utf8(Some("bool".to_string()))),
1274 ],
1275 arg_fields: vec![],
1276 number_rows: 1,
1277 return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
1278 config_options: Arc::new(Default::default()),
1279 };
1280 let result = json_get_with_type
1281 .invoke_with_args(args)
1282 .and_then(|x| x.to_array(1))
1283 .unwrap();
1284
1285 let result = result.as_boolean();
1286 assert_eq!(1, result.len());
1287 let actual = result.is_valid(0).then(|| result.value(0));
1288 assert_eq!(actual, *expect);
1289 }
1290 }
1291}