common_function/scalars/json/
parse_json.rs1use std::fmt::{self, Display};
16use std::sync::Arc;
17
18use datafusion_common::DataFusionError;
19use datafusion_common::arrow::array::{Array, AsArray, BinaryViewBuilder};
20use datafusion_common::arrow::compute;
21use datafusion_common::arrow::datatypes::DataType;
22use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
23
24use crate::function::{Function, extract_args};
25
26#[derive(Clone, Debug)]
28pub(crate) struct ParseJsonFunction {
29 signature: Signature,
30}
31
32impl Default for ParseJsonFunction {
33 fn default() -> Self {
34 Self {
35 signature: Signature::string(1, Volatility::Immutable),
36 }
37 }
38}
39
40const NAME: &str = "parse_json";
41
42impl Function for ParseJsonFunction {
43 fn name(&self) -> &str {
44 NAME
45 }
46
47 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
48 Ok(DataType::BinaryView)
49 }
50
51 fn signature(&self) -> &Signature {
52 &self.signature
53 }
54
55 fn invoke_with_args(
56 &self,
57 args: ScalarFunctionArgs,
58 ) -> datafusion_common::Result<ColumnarValue> {
59 let [arg0] = extract_args(self.name(), &args)?;
60 let arg0 = compute::cast(&arg0, &DataType::Utf8View)?;
61 let json_strings = arg0.as_string_view();
62
63 let size = json_strings.len();
64 let mut builder = BinaryViewBuilder::with_capacity(size);
65
66 for i in 0..size {
67 let s = json_strings.is_valid(i).then(|| json_strings.value(i));
68 let result = s
69 .map(|s| {
70 jsonb::parse_value(s.as_bytes())
71 .map(|x| x.to_vec())
72 .map_err(|e| DataFusionError::Execution(format!("cannot parse '{s}': {e}")))
73 })
74 .transpose()?;
75 builder.append_option(result.as_deref());
76 }
77
78 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
79 }
80}
81
82impl Display for ParseJsonFunction {
83 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
84 write!(f, "PARSE_JSON")
85 }
86}
87
88#[cfg(test)]
89mod tests {
90 use std::sync::Arc;
91
92 use arrow_schema::Field;
93 use datafusion_common::arrow::array::StringViewArray;
94
95 use super::*;
96
97 #[test]
98 fn test_get_by_path_function() {
99 let parse_json = ParseJsonFunction::default();
100
101 assert_eq!("parse_json", parse_json.name());
102 assert_eq!(
103 DataType::BinaryView,
104 parse_json.return_type(&[DataType::Binary]).unwrap()
105 );
106
107 let json_strings = [
108 r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
109 r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
110 r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
111 ];
112
113 let jsonbs = json_strings
114 .iter()
115 .map(|s| {
116 let value = jsonb::parse_value(s.as_bytes()).unwrap();
117 value.to_vec()
118 })
119 .collect::<Vec<_>>();
120
121 let args = ScalarFunctionArgs {
122 args: vec![ColumnarValue::Array(Arc::new(
123 StringViewArray::from_iter_values(json_strings),
124 ))],
125 arg_fields: vec![],
126 number_rows: 3,
127 return_field: Arc::new(Field::new("x", DataType::BinaryView, false)),
128 config_options: Arc::new(Default::default()),
129 };
130 let result = parse_json
131 .invoke_with_args(args)
132 .and_then(|x| x.to_array(3))
133 .unwrap();
134 let vector = result.as_binary_view();
135
136 assert_eq!(3, vector.len());
137 for (i, gt) in jsonbs.iter().enumerate() {
138 let result = vector.value(i);
139 assert_eq!(gt, result);
140 }
141 }
142}