1use std::str::FromStr;
16use std::sync::Arc;
17
18use arrow::array::{ArrayRef, BinaryViewArray, StringViewArray, StructArray};
19use arrow::compute;
20use arrow::datatypes::{Float64Type, Int64Type, UInt64Type};
21use arrow_schema::Field;
22use datafusion_common::arrow::array::{
23 Array, AsArray, BinaryViewBuilder, BooleanBuilder, Float64Builder, Int64Builder,
24 StringViewBuilder,
25};
26use datafusion_common::arrow::datatypes::DataType;
27use datafusion_common::{DataFusionError, Result, ScalarValue};
28use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
29use datatypes::arrow_array::{int_array_value_at_index, string_array_value_at_index};
30use datatypes::json::JsonStructureSettings;
31use derive_more::Display;
32use jsonpath_rust::JsonPath;
33use serde_json::Value;
34
35use crate::function::{Function, extract_args};
36use crate::helper;
37
38fn get_json_by_path(json: &[u8], path: &str) -> Option<Vec<u8>> {
39 let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes());
40 match json_path {
41 Ok(json_path) => {
42 let mut sub_jsonb = Vec::new();
43 let mut sub_offsets = Vec::new();
44 match jsonb::get_by_path(json, json_path, &mut sub_jsonb, &mut sub_offsets) {
45 Ok(_) => Some(sub_jsonb),
46 Err(_) => None,
47 }
48 }
49 _ => None,
50 }
51}
52
53enum JsonResultValue<'a> {
54 Jsonb(Vec<u8>),
55 JsonStructByColumn(&'a ArrayRef, usize),
56 JsonStructByValue(&'a Value),
57}
58
59trait JsonGetResultBuilder {
60 fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()>;
61
62 fn append_null(&mut self);
63
64 fn build(&mut self) -> ArrayRef;
65}
66
67#[derive(Debug)]
74struct JsonGet {
75 signature: Signature,
76}
77
78impl JsonGet {
79 fn invoke<F, B>(&self, args: ScalarFunctionArgs, builder_factory: F) -> Result<ColumnarValue>
80 where
81 F: Fn(usize) -> B,
82 B: JsonGetResultBuilder,
83 {
84 let [arg0, arg1] = extract_args("JSON_GET", &args)?;
85
86 let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
87 let paths = arg1.as_string_view();
88
89 let mut builder = (builder_factory)(arg0.len());
90 match arg0.data_type() {
91 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
92 let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
93 let jsons = arg0.as_binary_view();
94 jsonb_get(jsons, paths, &mut builder)?;
95 }
96 DataType::Struct(_) => {
97 let jsons = arg0.as_struct();
98 json_struct_get(jsons, paths, &mut builder)?
99 }
100 _ => {
101 return Err(DataFusionError::Execution(format!(
102 "JSON_GET not supported argument type {}",
103 arg0.data_type(),
104 )));
105 }
106 };
107
108 Ok(ColumnarValue::Array(builder.build()))
109 }
110}
111
112impl Default for JsonGet {
113 fn default() -> Self {
114 Self {
115 signature: Signature::any(2, Volatility::Immutable),
116 }
117 }
118}
119
120struct StringResultBuilder(StringViewBuilder);
121
122impl JsonGetResultBuilder for StringResultBuilder {
123 fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
124 match value {
125 JsonResultValue::Jsonb(value) => self.0.append_option(jsonb::to_str(&value).ok()),
126 JsonResultValue::JsonStructByColumn(column, i) => {
127 if let Some(v) = string_array_value_at_index(column, i) {
128 self.0.append_value(v);
129 } else {
130 self.0
131 .append_value(arrow_cast::display::array_value_to_string(column, i)?);
132 }
133 }
134 JsonResultValue::JsonStructByValue(value) => {
135 if let Some(s) = value.as_str() {
136 self.0.append_value(s)
137 } else {
138 self.0.append_value(value.to_string())
139 }
140 }
141 }
142 Ok(())
143 }
144
145 fn append_null(&mut self) {
146 self.0.append_null();
147 }
148
149 fn build(&mut self) -> ArrayRef {
150 Arc::new(self.0.finish())
151 }
152}
153
154#[derive(Default, Display, Debug)]
155#[display("{}", Self::NAME.to_ascii_uppercase())]
156pub struct JsonGetString(JsonGet);
157
158impl JsonGetString {
159 pub const NAME: &'static str = "json_get_string";
160}
161
162impl Function for JsonGetString {
163 fn name(&self) -> &str {
164 Self::NAME
165 }
166
167 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
168 Ok(DataType::Utf8View)
169 }
170
171 fn signature(&self) -> &Signature {
172 &self.0.signature
173 }
174
175 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
176 self.0.invoke(args, |len: usize| {
177 StringResultBuilder(StringViewBuilder::with_capacity(len))
178 })
179 }
180}
181
182struct IntResultBuilder(Int64Builder);
183
184impl JsonGetResultBuilder for IntResultBuilder {
185 fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
186 match value {
187 JsonResultValue::Jsonb(value) => self.0.append_option(jsonb::to_i64(&value).ok()),
188 JsonResultValue::JsonStructByColumn(column, i) => {
189 self.0.append_option(int_array_value_at_index(column, i))
190 }
191 JsonResultValue::JsonStructByValue(value) => self.0.append_option(value.as_i64()),
192 }
193 Ok(())
194 }
195
196 fn append_null(&mut self) {
197 self.0.append_null();
198 }
199
200 fn build(&mut self) -> ArrayRef {
201 Arc::new(self.0.finish())
202 }
203}
204
205#[derive(Default, Display, Debug)]
206#[display("{}", Self::NAME.to_ascii_uppercase())]
207pub struct JsonGetInt(JsonGet);
208
209impl JsonGetInt {
210 pub const NAME: &'static str = "json_get_int";
211}
212
213impl Function for JsonGetInt {
214 fn name(&self) -> &str {
215 Self::NAME
216 }
217
218 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
219 Ok(DataType::Int64)
220 }
221
222 fn signature(&self) -> &Signature {
223 &self.0.signature
224 }
225
226 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
227 self.0.invoke(args, |len: usize| {
228 IntResultBuilder(Int64Builder::with_capacity(len))
229 })
230 }
231}
232
233struct FloatResultBuilder(Float64Builder);
234
235impl JsonGetResultBuilder for FloatResultBuilder {
236 fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
237 match value {
238 JsonResultValue::Jsonb(value) => self.0.append_option(jsonb::to_f64(&value).ok()),
239 JsonResultValue::JsonStructByColumn(column, i) => {
240 let result = if column.data_type() == &DataType::Float64 {
241 column
242 .as_primitive::<Float64Type>()
243 .is_valid(i)
244 .then(|| column.as_primitive::<Float64Type>().value(i))
245 } else {
246 None
247 };
248 self.0.append_option(result);
249 }
250 JsonResultValue::JsonStructByValue(value) => self.0.append_option(value.as_f64()),
251 }
252 Ok(())
253 }
254
255 fn append_null(&mut self) {
256 self.0.append_null();
257 }
258
259 fn build(&mut self) -> ArrayRef {
260 Arc::new(self.0.finish())
261 }
262}
263
264#[derive(Default, Display, Debug)]
265#[display("{}", Self::NAME.to_ascii_uppercase())]
266pub struct JsonGetFloat(JsonGet);
267
268impl JsonGetFloat {
269 pub const NAME: &'static str = "json_get_float";
270}
271
272impl Function for JsonGetFloat {
273 fn name(&self) -> &str {
274 Self::NAME
275 }
276
277 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
278 Ok(DataType::Float64)
279 }
280
281 fn signature(&self) -> &Signature {
282 &self.0.signature
283 }
284
285 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
286 self.0.invoke(args, |len: usize| {
287 FloatResultBuilder(Float64Builder::with_capacity(len))
288 })
289 }
290}
291
292struct BoolResultBuilder(BooleanBuilder);
293
294impl JsonGetResultBuilder for BoolResultBuilder {
295 fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
296 match value {
297 JsonResultValue::Jsonb(value) => self.0.append_option(jsonb::to_bool(&value).ok()),
298 JsonResultValue::JsonStructByColumn(column, i) => {
299 let result = if column.data_type() == &DataType::Boolean {
300 column
301 .as_boolean()
302 .is_valid(i)
303 .then(|| column.as_boolean().value(i))
304 } else {
305 None
306 };
307 self.0.append_option(result);
308 }
309 JsonResultValue::JsonStructByValue(value) => self.0.append_option(value.as_bool()),
310 }
311 Ok(())
312 }
313
314 fn append_null(&mut self) {
315 self.0.append_null();
316 }
317
318 fn build(&mut self) -> ArrayRef {
319 Arc::new(self.0.finish())
320 }
321}
322
323#[derive(Default, Display, Debug)]
324#[display("{}", Self::NAME.to_ascii_uppercase())]
325pub struct JsonGetBool(JsonGet);
326
327impl JsonGetBool {
328 pub const NAME: &'static str = "json_get_bool";
329}
330
331impl Function for JsonGetBool {
332 fn name(&self) -> &str {
333 Self::NAME
334 }
335
336 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
337 Ok(DataType::Boolean)
338 }
339
340 fn signature(&self) -> &Signature {
341 &self.0.signature
342 }
343
344 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
345 self.0.invoke(args, |len: usize| {
346 BoolResultBuilder(BooleanBuilder::with_capacity(len))
347 })
348 }
349}
350
351fn jsonb_get(
352 jsons: &BinaryViewArray,
353 paths: &StringViewArray,
354 builder: &mut dyn JsonGetResultBuilder,
355) -> Result<()> {
356 let size = jsons.len();
357 for i in 0..size {
358 let json = jsons.is_valid(i).then(|| jsons.value(i));
359 let path = paths.is_valid(i).then(|| paths.value(i));
360 let result = match (json, path) {
361 (Some(json), Some(path)) => get_json_by_path(json, path),
362 _ => None,
363 };
364 if let Some(v) = result {
365 builder.append_value(JsonResultValue::Jsonb(v))?;
366 } else {
367 builder.append_null();
368 }
369 }
370 Ok(())
371}
372
373fn json_struct_get(
374 jsons: &StructArray,
375 paths: &StringViewArray,
376 builder: &mut dyn JsonGetResultBuilder,
377) -> Result<()> {
378 let size = jsons.len();
379 for i in 0..size {
380 if jsons.is_null(i) || paths.is_null(i) {
381 builder.append_null();
382 continue;
383 }
384 let path = paths.value(i);
385
386 let field_path = path.trim().replace("$.", "");
388 let column = jsons.column_by_name(&field_path);
389
390 if let Some(column) = column {
391 builder.append_value(JsonResultValue::JsonStructByColumn(column, i))?;
392 } else {
393 let Some(raw) = jsons
394 .column_by_name(JsonStructureSettings::RAW_FIELD)
395 .and_then(|x| string_array_value_at_index(x, i))
396 else {
397 builder.append_null();
398 continue;
399 };
400
401 let path: JsonPath<Value> = JsonPath::try_from(path).map_err(|e| {
402 DataFusionError::Execution(format!("{path} is not a valid JSON path: {e}"))
403 })?;
404 let value = json_struct_to_value(raw, jsons, i)?;
407
408 match path.find(&value) {
409 Value::Null => builder.append_null(),
410 Value::Array(values) => match values.as_slice() {
411 [] => builder.append_null(),
412 [x] => builder.append_value(JsonResultValue::JsonStructByValue(x))?,
413 _ => builder.append_value(JsonResultValue::JsonStructByValue(&value))?,
414 },
415 value => builder.append_value(JsonResultValue::JsonStructByValue(&value))?,
416 }
417 }
418 }
419
420 Ok(())
421}
422
423fn json_struct_to_value(raw: &str, jsons: &StructArray, i: usize) -> Result<Value> {
424 let Ok(mut json) = Value::from_str(raw) else {
425 return Err(DataFusionError::Internal(format!(
426 "inner field '{}' is not a valid JSON string",
427 JsonStructureSettings::RAW_FIELD
428 )));
429 };
430
431 for (column_name, column) in jsons.column_names().into_iter().zip(jsons.columns()) {
432 if column_name == JsonStructureSettings::RAW_FIELD {
433 continue;
434 }
435
436 let (json_pointer, field) = if let Some((json_object, field)) = column_name.rsplit_once(".")
437 {
438 let json_pointer = format!("/{}", json_object.replace(".", "/"));
439 (json_pointer, field)
440 } else {
441 ("".to_string(), column_name)
442 };
443 let Some(json_object) = json
444 .pointer_mut(&json_pointer)
445 .and_then(|x| x.as_object_mut())
446 else {
447 return Err(DataFusionError::Internal(format!(
448 "value at JSON pointer '{}' is not an object",
449 json_pointer
450 )));
451 };
452
453 macro_rules! insert {
454 ($column: ident, $i: ident, $json_object: ident, $field: ident) => {{
455 if let Some(value) = $column
456 .is_valid($i)
457 .then(|| serde_json::Value::from($column.value($i)))
458 {
459 $json_object.insert($field.to_string(), value);
460 }
461 }};
462 }
463
464 match column.data_type() {
465 DataType::Boolean => {
467 let column = column.as_boolean();
468 insert!(column, i, json_object, field);
469 }
470 DataType::Int64 => {
472 let column = column.as_primitive::<Int64Type>();
473 insert!(column, i, json_object, field);
474 }
475 DataType::UInt64 => {
476 let column = column.as_primitive::<UInt64Type>();
477 insert!(column, i, json_object, field);
478 }
479 DataType::Float64 => {
480 let column = column.as_primitive::<Float64Type>();
481 insert!(column, i, json_object, field);
482 }
483 DataType::Utf8 => {
485 let column = column.as_string::<i32>();
486 insert!(column, i, json_object, field);
487 }
488 DataType::LargeUtf8 => {
489 let column = column.as_string::<i64>();
490 insert!(column, i, json_object, field);
491 }
492 DataType::Utf8View => {
493 let column = column.as_string_view();
494 insert!(column, i, json_object, field);
495 }
496 _ => {
498 return Err(DataFusionError::NotImplemented(format!(
499 "{} is not yet supported to be executed with field {} of datatype {}",
500 JsonGetString::NAME,
501 column_name,
502 column.data_type()
503 )));
504 }
505 }
506 }
507 Ok(json)
508}
509
510#[derive(Debug, Display)]
511#[display("{}", Self::NAME.to_ascii_uppercase())]
512pub(super) struct JsonGetWithType {
513 signature: Signature,
514}
515
516impl JsonGetWithType {
517 const NAME: &'static str = "json_get";
518}
519
520impl Default for JsonGetWithType {
521 fn default() -> Self {
522 Self {
523 signature: Signature::any(3, Volatility::Immutable),
524 }
525 }
526}
527
528impl Function for JsonGetWithType {
529 fn name(&self) -> &str {
530 Self::NAME
531 }
532
533 fn return_type(&self, _input_types: &[DataType]) -> datafusion_common::Result<DataType> {
534 Err(DataFusionError::Internal(
535 "This method isn't meant to be called".to_string(),
536 ))
537 }
538
539 fn return_field_from_args(
540 &self,
541 args: datafusion_expr::ReturnFieldArgs<'_>,
542 ) -> datafusion_common::Result<Arc<Field>> {
543 match args.scalar_arguments[2] {
544 Some(ScalarValue::Utf8(Some(type_str)))
545 | Some(ScalarValue::Utf8View(Some(type_str)))
546 | Some(ScalarValue::LargeUtf8(Some(type_str))) => {
547 let type_str = type_str.to_ascii_lowercase();
548 match type_str.as_str() {
549 "bool" | "boolean" => {
550 Ok(Arc::new(Field::new(self.name(), DataType::Boolean, true)))
551 }
552 "int" | "integer" => {
553 Ok(Arc::new(Field::new(self.name(), DataType::Int64, true)))
554 }
555 "float" | "double" => {
556 Ok(Arc::new(Field::new(self.name(), DataType::Float64, true)))
557 }
558 "string" => Ok(Arc::new(Field::new(self.name(), DataType::Utf8View, true))),
559 _ => Err(DataFusionError::Internal(format!(
560 "Unsupported type: {}",
561 type_str
562 ))),
563 }
564 }
565 _ => Err(DataFusionError::Internal(
566 "Invalid argument provided for type".to_string(),
567 )),
568 }
569 }
570
571 fn signature(&self) -> &Signature {
572 &self.signature
573 }
574
575 fn invoke_with_args(
576 &self,
577 args: ScalarFunctionArgs,
578 ) -> datafusion_common::Result<ColumnarValue> {
579 let [arg0, arg1, _] = extract_args("JSON_GET", &args)?;
580 let len = arg0.len();
581
582 let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
583 let paths = arg1.as_string_view();
584
585 let mut builder: Box<dyn JsonGetResultBuilder> = match args.return_field.data_type() {
587 DataType::Utf8View => {
588 Box::new(StringResultBuilder(StringViewBuilder::with_capacity(len)))
589 }
590 DataType::Int64 => Box::new(IntResultBuilder(Int64Builder::with_capacity(len))),
591 DataType::Float64 => Box::new(FloatResultBuilder(Float64Builder::with_capacity(len))),
592 DataType::Boolean => Box::new(BoolResultBuilder(BooleanBuilder::with_capacity(len))),
593 _ => {
594 return Err(DataFusionError::Internal(
595 "Unsupported return type".to_string(),
596 ));
597 }
598 };
599
600 match arg0.data_type() {
601 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
602 let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
603 let jsons = arg0.as_binary_view();
604 jsonb_get(jsons, paths, builder.as_mut())?;
605 }
606 DataType::Struct(_) => {
607 let jsons = arg0.as_struct();
608 json_struct_get(jsons, paths, builder.as_mut())?;
609 }
610 _ => {
611 return Err(DataFusionError::Execution(format!(
612 "JSON_GET not supported argument type {}",
613 arg0.data_type(),
614 )));
615 }
616 };
617
618 Ok(ColumnarValue::Array(builder.build()))
619 }
620}
621
622#[derive(Display, Debug)]
624#[display("{}", Self::NAME.to_ascii_uppercase())]
625pub(super) struct JsonGetObject {
626 signature: Signature,
627}
628
629impl JsonGetObject {
630 const NAME: &'static str = "json_get_object";
631}
632
633impl Default for JsonGetObject {
634 fn default() -> Self {
635 Self {
636 signature: helper::one_of_sigs2(
637 vec![
638 DataType::Binary,
639 DataType::LargeBinary,
640 DataType::BinaryView,
641 ],
642 vec![DataType::UInt8, DataType::LargeUtf8, DataType::Utf8View],
643 ),
644 }
645 }
646}
647
648impl Function for JsonGetObject {
649 fn name(&self) -> &str {
650 Self::NAME
651 }
652
653 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
654 Ok(DataType::BinaryView)
655 }
656
657 fn signature(&self) -> &Signature {
658 &self.signature
659 }
660
661 fn invoke_with_args(
662 &self,
663 args: ScalarFunctionArgs,
664 ) -> datafusion_common::Result<ColumnarValue> {
665 let [arg0, arg1] = extract_args(self.name(), &args)?;
666 let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
667 let jsons = arg0.as_binary_view();
668 let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
669 let paths = arg1.as_string_view();
670
671 let len = jsons.len();
672 let mut builder = BinaryViewBuilder::with_capacity(len);
673
674 for i in 0..len {
675 let json = jsons.is_valid(i).then(|| jsons.value(i));
676 let path = paths.is_valid(i).then(|| paths.value(i));
677 let result = if let (Some(json), Some(path)) = (json, path) {
678 let result = jsonb::jsonpath::parse_json_path(path.as_bytes()).and_then(|path| {
679 let mut data = Vec::new();
680 let mut offset = Vec::new();
681 jsonb::get_by_path(json, path, &mut data, &mut offset)
682 .map(|()| jsonb::is_object(&data).then_some(data))
683 });
684 result.map_err(|e| DataFusionError::Execution(e.to_string()))?
685 } else {
686 None
687 };
688 builder.append_option(result);
689 }
690
691 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
692 }
693}
694
695#[cfg(test)]
696mod tests {
697 use std::sync::Arc;
698
699 use arrow::array::{Float64Array, Int64Array, StructArray};
700 use arrow_schema::Field;
701 use datafusion_common::ScalarValue;
702 use datafusion_common::arrow::array::{BinaryArray, BinaryViewArray, StringArray};
703 use datafusion_common::arrow::datatypes::{Float64Type, Int64Type};
704 use datatypes::types::parse_string_to_jsonb;
705 use serde_json::json;
706
707 use super::*;
708
709 fn test_json_struct() -> ArrayRef {
725 Arc::new(StructArray::new(
726 vec![
727 Field::new("kind", DataType::Utf8, true),
728 Field::new("payload.code", DataType::Int64, true),
729 Field::new("payload.result.time_cost", DataType::Float64, true),
730 Field::new(JsonStructureSettings::RAW_FIELD, DataType::Utf8View, true),
731 ]
732 .into(),
733 vec![
734 Arc::new(StringArray::from_iter([Some("foo")])) as ArrayRef,
735 Arc::new(Int64Array::from_iter([Some(404)])),
736 Arc::new(Float64Array::from_iter([Some(1.234)])),
737 Arc::new(StringViewArray::from_iter([Some(
738 json! ({
739 "payload": {
740 "success": false,
741 "result": {
742 "error": "not found"
743 }
744 }
745 })
746 .to_string(),
747 )])),
748 ],
749 None,
750 ))
751 }
752
753 #[test]
754 fn test_json_get_int() {
755 let json_get_int = JsonGetInt::default();
756
757 assert_eq!("json_get_int", json_get_int.name());
758 assert_eq!(
759 DataType::Int64,
760 json_get_int
761 .return_type(&[DataType::Binary, DataType::Utf8])
762 .unwrap()
763 );
764
765 let json_strings = [
766 r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
767 r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
768 r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
769 ];
770 let json_struct = test_json_struct();
771
772 let path_expects = vec![
773 ("$.a.b", Some(2)),
774 ("$.a", Some(4)),
775 ("$.c", None),
776 ("$.kind", None),
777 ("$.payload.code", Some(404)),
778 ("$.payload.success", None),
779 ("$.payload.result.time_cost", None),
780 ("$.payload.not-exists", None),
781 ("$.not-exists", None),
782 ("$", None),
783 ];
784
785 let mut jsons = json_strings
786 .iter()
787 .map(|s| {
788 let value = jsonb::parse_value(s.as_bytes()).unwrap();
789 Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
790 })
791 .collect::<Vec<_>>();
792 let json_struct_arrays =
793 std::iter::repeat_n(json_struct, path_expects.len() - jsons.len()).collect::<Vec<_>>();
794 jsons.extend(json_struct_arrays);
795
796 for i in 0..jsons.len() {
797 let json = &jsons[i];
798 let (path, expect) = path_expects[i];
799
800 let args = ScalarFunctionArgs {
801 args: vec![
802 ColumnarValue::Array(json.clone()),
803 ColumnarValue::Scalar(path.into()),
804 ],
805 arg_fields: vec![],
806 number_rows: 1,
807 return_field: Arc::new(Field::new("x", DataType::Int64, false)),
808 config_options: Arc::new(Default::default()),
809 };
810 let result = json_get_int
811 .invoke_with_args(args)
812 .and_then(|x| x.to_array(1))
813 .unwrap();
814
815 let result = result.as_primitive::<Int64Type>();
816 assert_eq!(1, result.len());
817 let actual = result.is_valid(0).then(|| result.value(0));
818 assert_eq!(actual, expect);
819 }
820 }
821
822 #[test]
823 fn test_json_get_float() {
824 let json_get_float = JsonGetFloat::default();
825
826 assert_eq!("json_get_float", json_get_float.name());
827 assert_eq!(
828 DataType::Float64,
829 json_get_float
830 .return_type(&[DataType::Binary, DataType::Utf8])
831 .unwrap()
832 );
833
834 let json_strings = [
835 r#"{"a": {"b": 2.1}, "b": 2.2, "c": 3.3}"#,
836 r#"{"a": 4.4, "b": {"c": 6.6}, "c": 6.6}"#,
837 r#"{"a": 7.7, "b": 8.8, "c": {"a": 7.7}}"#,
838 ];
839 let json_struct = test_json_struct();
840
841 let path_expects = vec![
842 ("$.a.b", Some(2.1)),
843 ("$.a", Some(4.4)),
844 ("$.c", None),
845 ("$.kind", None),
846 ("$.payload.code", None),
847 ("$.payload.success", None),
848 ("$.payload.result.time_cost", Some(1.234)),
849 ("$.payload.not-exists", None),
850 ("$.not-exists", None),
851 ("$", None),
852 ];
853
854 let mut jsons = json_strings
855 .iter()
856 .map(|s| {
857 let value = jsonb::parse_value(s.as_bytes()).unwrap();
858 Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
859 })
860 .collect::<Vec<_>>();
861 let json_struct_arrays =
862 std::iter::repeat_n(json_struct, path_expects.len() - jsons.len()).collect::<Vec<_>>();
863 jsons.extend(json_struct_arrays);
864
865 for i in 0..jsons.len() {
866 let json = &jsons[i];
867 let (path, expect) = path_expects[i];
868
869 let args = ScalarFunctionArgs {
870 args: vec![
871 ColumnarValue::Array(json.clone()),
872 ColumnarValue::Scalar(path.into()),
873 ],
874 arg_fields: vec![],
875 number_rows: 1,
876 return_field: Arc::new(Field::new("x", DataType::Float64, false)),
877 config_options: Arc::new(Default::default()),
878 };
879 let result = json_get_float
880 .invoke_with_args(args)
881 .and_then(|x| x.to_array(1))
882 .unwrap();
883
884 let result = result.as_primitive::<Float64Type>();
885 assert_eq!(1, result.len());
886 let actual = result.is_valid(0).then(|| result.value(0));
887 assert_eq!(actual, expect);
888 }
889 }
890
891 #[test]
892 fn test_json_get_bool() {
893 let json_get_bool = JsonGetBool::default();
894
895 assert_eq!("json_get_bool", json_get_bool.name());
896 assert_eq!(
897 DataType::Boolean,
898 json_get_bool
899 .return_type(&[DataType::Binary, DataType::Utf8])
900 .unwrap()
901 );
902
903 let json_strings = [
904 r#"{"a": {"b": true}, "b": false, "c": true}"#,
905 r#"{"a": false, "b": {"c": true}, "c": false}"#,
906 r#"{"a": true, "b": false, "c": {"a": true}}"#,
907 ];
908 let json_struct = test_json_struct();
909
910 let path_expects = vec![
911 ("$.a.b", Some(true)),
912 ("$.a", Some(false)),
913 ("$.c", None),
914 ("$.kind", None),
915 ("$.payload.code", None),
916 ("$.payload.success", Some(false)),
917 ("$.payload.result.time_cost", None),
918 ("$.payload.not-exists", None),
919 ("$.not-exists", None),
920 ("$", None),
921 ];
922
923 let mut jsons = json_strings
924 .iter()
925 .map(|s| {
926 let value = jsonb::parse_value(s.as_bytes()).unwrap();
927 Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
928 })
929 .collect::<Vec<_>>();
930 let json_struct_arrays =
931 std::iter::repeat_n(json_struct, path_expects.len() - jsons.len()).collect::<Vec<_>>();
932 jsons.extend(json_struct_arrays);
933
934 for i in 0..jsons.len() {
935 let json = &jsons[i];
936 let (path, expect) = path_expects[i];
937
938 let args = ScalarFunctionArgs {
939 args: vec![
940 ColumnarValue::Array(json.clone()),
941 ColumnarValue::Scalar(path.into()),
942 ],
943 arg_fields: vec![],
944 number_rows: 1,
945 return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
946 config_options: Arc::new(Default::default()),
947 };
948 let result = json_get_bool
949 .invoke_with_args(args)
950 .and_then(|x| x.to_array(1))
951 .unwrap();
952
953 let result = result.as_boolean();
954 assert_eq!(1, result.len());
955 let actual = result.is_valid(0).then(|| result.value(0));
956 assert_eq!(actual, expect);
957 }
958 }
959
960 #[test]
961 fn test_json_get_string() {
962 let json_get_string = JsonGetString::default();
963
964 assert_eq!("json_get_string", json_get_string.name());
965 assert_eq!(
966 DataType::Utf8View,
967 json_get_string
968 .return_type(&[DataType::Binary, DataType::Utf8])
969 .unwrap()
970 );
971
972 let json_strings = [
973 r#"{"a": {"b": "a"}, "b": "b", "c": "c"}"#,
974 r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
975 r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#,
976 ];
977 let json_struct = test_json_struct();
978
979 let paths = vec![
980 "$.a.b",
981 "$.a",
982 "",
983 "$.kind",
984 "$.payload.code",
985 "$.payload.result.time_cost",
986 "$.payload",
987 "$.payload.success",
988 "$.payload.result",
989 "$.payload.result.error",
990 "$.payload.result.not-exists",
991 "$.payload.not-exists",
992 "$.not-exists",
993 "$",
994 ];
995 let expects = [
996 Some("a"),
997 Some("d"),
998 None,
999 Some("foo"),
1000 Some("404"),
1001 Some("1.234"),
1002 Some(
1003 r#"{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}"#,
1004 ),
1005 Some("false"),
1006 Some(r#"{"error":"not found","time_cost":1.234}"#),
1007 Some("not found"),
1008 None,
1009 None,
1010 None,
1011 Some(
1012 r#"{"kind":"foo","payload":{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}}"#,
1013 ),
1014 ];
1015
1016 let mut jsons = json_strings
1017 .iter()
1018 .map(|s| {
1019 let value = jsonb::parse_value(s.as_bytes()).unwrap();
1020 Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
1021 })
1022 .collect::<Vec<_>>();
1023 let json_struct_arrays =
1024 std::iter::repeat_n(json_struct, expects.len() - jsons.len()).collect::<Vec<_>>();
1025 jsons.extend(json_struct_arrays);
1026
1027 for i in 0..jsons.len() {
1028 let json = &jsons[i];
1029 let path = paths[i];
1030 let expect = expects[i];
1031
1032 let args = ScalarFunctionArgs {
1033 args: vec![
1034 ColumnarValue::Array(json.clone()),
1035 ColumnarValue::Scalar(path.into()),
1036 ],
1037 arg_fields: vec![],
1038 number_rows: 1,
1039 return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
1040 config_options: Arc::new(Default::default()),
1041 };
1042 let result = json_get_string
1043 .invoke_with_args(args)
1044 .and_then(|x| x.to_array(1))
1045 .unwrap();
1046
1047 let result = result.as_string_view();
1048 assert_eq!(1, result.len());
1049 let actual = result.is_valid(0).then(|| result.value(0));
1050 assert_eq!(actual, expect);
1051 }
1052 }
1053
1054 #[test]
1055 fn test_json_get_object() -> Result<()> {
1056 let udf = JsonGetObject::default();
1057 assert_eq!("json_get_object", udf.name());
1058 assert_eq!(
1059 DataType::BinaryView,
1060 udf.return_type(&[DataType::BinaryView, DataType::Utf8View])?
1061 );
1062
1063 let json_value = parse_string_to_jsonb(r#"{"a": {"b": {"c": {"d": 1}}}}"#).unwrap();
1064 let paths = vec!["$", "$.a", "$.a.b", "$.a.b.c", "$.a.b.c.d", "$.e", "$.a.e"];
1065 let number_rows = paths.len();
1066
1067 let args = ScalarFunctionArgs {
1068 args: vec![
1069 ColumnarValue::Scalar(ScalarValue::Binary(Some(json_value))),
1070 ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
1071 ],
1072 arg_fields: vec![],
1073 number_rows,
1074 return_field: Arc::new(Field::new("x", DataType::Binary, false)),
1075 config_options: Arc::new(Default::default()),
1076 };
1077 let result = udf
1078 .invoke_with_args(args)
1079 .and_then(|x| x.to_array(number_rows))?;
1080 let result = result.as_binary_view();
1081
1082 let expected = &BinaryViewArray::from_iter(
1083 vec![
1084 Some(r#"{"a": {"b": {"c": {"d": 1}}}}"#),
1085 Some(r#"{"b": {"c": {"d": 1}}}"#),
1086 Some(r#"{"c": {"d": 1}}"#),
1087 Some(r#"{"d": 1}"#),
1088 None,
1089 None,
1090 None,
1091 ]
1092 .into_iter()
1093 .map(|x| x.and_then(|s| parse_string_to_jsonb(s).ok())),
1094 );
1095 assert_eq!(result, expected);
1096 Ok(())
1097 }
1098
1099 #[test]
1100 fn test_json_get_with_type() {
1101 let json_get_with_type = JsonGetWithType::default();
1102
1103 assert_eq!("json_get", json_get_with_type.name());
1104
1105 let json_strings = [
1106 r#"{"a": {"b": "a"}, "b": "b", "c": "c"}"#,
1107 r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
1108 r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#,
1109 ];
1110 let json_struct = test_json_struct();
1111
1112 let paths = vec![
1113 "$.a.b",
1114 "$.a",
1115 "",
1116 "$.kind",
1117 "$.payload.code",
1118 "$.payload.result.time_cost",
1119 "$.payload",
1120 "$.payload.success",
1121 "$.payload.result",
1122 "$.payload.result.error",
1123 "$.payload.result.not-exists",
1124 "$.payload.not-exists",
1125 "$.not-exists",
1126 "$",
1127 ];
1128 let expects = [
1129 Some("a"),
1130 Some("d"),
1131 None,
1132 Some("foo"),
1133 Some("404"),
1134 Some("1.234"),
1135 Some(
1136 r#"{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}"#,
1137 ),
1138 Some("false"),
1139 Some(r#"{"error":"not found","time_cost":1.234}"#),
1140 Some("not found"),
1141 None,
1142 None,
1143 None,
1144 Some(
1145 r#"{"kind":"foo","payload":{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}}"#,
1146 ),
1147 ];
1148
1149 let mut jsons = json_strings
1150 .iter()
1151 .map(|s| {
1152 let value = jsonb::parse_value(s.as_bytes()).unwrap();
1153 Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
1154 })
1155 .collect::<Vec<_>>();
1156 let json_struct_arrays =
1157 std::iter::repeat_n(json_struct, expects.len() - jsons.len()).collect::<Vec<_>>();
1158 jsons.extend(json_struct_arrays);
1159
1160 for i in 0..jsons.len() {
1161 let json = &jsons[i];
1162 let path = paths[i];
1163 let expect = expects[i];
1164
1165 let args = ScalarFunctionArgs {
1166 args: vec![
1167 ColumnarValue::Array(json.clone()),
1168 ColumnarValue::Scalar(path.into()),
1169 ColumnarValue::Scalar(ScalarValue::Utf8(Some("string".to_string()))),
1170 ],
1171 arg_fields: vec![],
1172 number_rows: 1,
1173 return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
1174 config_options: Arc::new(Default::default()),
1175 };
1176 let result = json_get_with_type
1177 .invoke_with_args(args)
1178 .and_then(|x| x.to_array(1))
1179 .unwrap();
1180
1181 let result = result.as_string_view();
1182 assert_eq!(1, result.len());
1183 let actual = result.is_valid(0).then(|| result.value(0));
1184 assert_eq!(actual, expect);
1185 }
1186
1187 let json_strings = [
1188 r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
1189 r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
1190 r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
1191 ];
1192 let paths = ["$.a.b", "$.a", "$.c", "$.payload.code"];
1193 let expects = [Some(2), Some(4), None, Some(404)];
1194
1195 for (i, (path, expect)) in paths.iter().zip(expects.iter()).enumerate() {
1196 let json = if i < json_strings.len() {
1197 let value = jsonb::parse_value(json_strings[i].as_bytes()).unwrap();
1198 Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
1199 } else {
1200 test_json_struct()
1201 };
1202
1203 let args = ScalarFunctionArgs {
1204 args: vec![
1205 ColumnarValue::Array(json),
1206 ColumnarValue::Scalar((*path).into()),
1207 ColumnarValue::Scalar(ScalarValue::Utf8(Some("int".to_string()))),
1208 ],
1209 arg_fields: vec![],
1210 number_rows: 1,
1211 return_field: Arc::new(Field::new("x", DataType::Int64, false)),
1212 config_options: Arc::new(Default::default()),
1213 };
1214 let result = json_get_with_type
1215 .invoke_with_args(args)
1216 .and_then(|x| x.to_array(1))
1217 .unwrap();
1218
1219 let result = result.as_primitive::<Int64Type>();
1220 assert_eq!(1, result.len());
1221 let actual = result.is_valid(0).then(|| result.value(0));
1222 assert_eq!(actual, *expect);
1223 }
1224
1225 let json_strings = [
1226 r#"{"a": {"b": 2.1}, "b": 2.2, "c": 3.3}"#,
1227 r#"{"a": 4.4, "b": {"c": 6.6}, "c": 6.6}"#,
1228 r#"{"a": 7.7, "b": 8.8, "c": {"a": 7.7}}"#,
1229 ];
1230 let paths = ["$.a.b", "$.a", "$.c", "$.payload.result.time_cost"];
1231 let expects = [Some(2.1), Some(4.4), None, Some(1.234)];
1232
1233 for (i, (path, expect)) in paths.iter().zip(expects.iter()).enumerate() {
1234 let json = if i < json_strings.len() {
1235 let value = jsonb::parse_value(json_strings[i].as_bytes()).unwrap();
1236 Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
1237 } else {
1238 test_json_struct()
1239 };
1240
1241 let args = ScalarFunctionArgs {
1242 args: vec![
1243 ColumnarValue::Array(json),
1244 ColumnarValue::Scalar((*path).into()),
1245 ColumnarValue::Scalar(ScalarValue::Utf8(Some("float".to_string()))),
1246 ],
1247 arg_fields: vec![],
1248 number_rows: 1,
1249 return_field: Arc::new(Field::new("x", DataType::Float64, false)),
1250 config_options: Arc::new(Default::default()),
1251 };
1252 let result = json_get_with_type
1253 .invoke_with_args(args)
1254 .and_then(|x| x.to_array(1))
1255 .unwrap();
1256
1257 let result = result.as_primitive::<Float64Type>();
1258 assert_eq!(1, result.len());
1259 let actual = result.is_valid(0).then(|| result.value(0));
1260 assert_eq!(actual, *expect);
1261 }
1262
1263 let json_strings = [
1264 r#"{"a": {"b": true}, "b": false, "c": true}"#,
1265 r#"{"a": false, "b": {"c": true}, "c": false}"#,
1266 r#"{"a": true, "b": false, "c": {"a": true}}"#,
1267 ];
1268 let paths = ["$.a.b", "$.a", "$.c", "$.payload.success"];
1269 let expects = [Some(true), Some(false), None, Some(false)];
1270
1271 for (i, (path, expect)) in paths.iter().zip(expects.iter()).enumerate() {
1272 let json = if i < json_strings.len() {
1273 let value = jsonb::parse_value(json_strings[i].as_bytes()).unwrap();
1274 Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
1275 } else {
1276 test_json_struct()
1277 };
1278
1279 let args = ScalarFunctionArgs {
1280 args: vec![
1281 ColumnarValue::Array(json),
1282 ColumnarValue::Scalar((*path).into()),
1283 ColumnarValue::Scalar(ScalarValue::Utf8(Some("bool".to_string()))),
1284 ],
1285 arg_fields: vec![],
1286 number_rows: 1,
1287 return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
1288 config_options: Arc::new(Default::default()),
1289 };
1290 let result = json_get_with_type
1291 .invoke_with_args(args)
1292 .and_then(|x| x.to_array(1))
1293 .unwrap();
1294
1295 let result = result.as_boolean();
1296 assert_eq!(1, result.len());
1297 let actual = result.is_valid(0).then(|| result.value(0));
1298 assert_eq!(actual, *expect);
1299 }
1300 }
1301}