common_function/scalars/json/
json_to_string.rs1use std::fmt::{self, Display};
16use std::sync::Arc;
17
18use datafusion_common::DataFusionError;
19use datafusion_common::arrow::array::{Array, AsArray, StringViewBuilder};
20use datafusion_common::arrow::datatypes::DataType;
21use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
22
23use crate::function::{Function, extract_args};
24
25#[derive(Clone, Debug)]
27pub(crate) struct JsonToStringFunction {
28 signature: Signature,
29}
30
31impl Default for JsonToStringFunction {
32 fn default() -> Self {
33 Self {
34 signature: Signature::exact(vec![DataType::Binary], Volatility::Immutable),
36 }
37 }
38}
39
40const NAME: &str = "json_to_string";
41
42impl Function for JsonToStringFunction {
43 fn name(&self) -> &str {
44 NAME
45 }
46
47 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
48 Ok(DataType::Utf8View)
49 }
50
51 fn signature(&self) -> &Signature {
52 &self.signature
53 }
54
55 fn invoke_with_args(
56 &self,
57 args: ScalarFunctionArgs,
58 ) -> datafusion_common::Result<ColumnarValue> {
59 let [arg0] = extract_args(self.name(), &args)?;
60 let jsons = arg0.as_binary::<i32>();
61
62 let size = jsons.len();
63 let mut builder = StringViewBuilder::with_capacity(size);
64
65 for i in 0..size {
66 let json = jsons.is_valid(i).then(|| jsons.value(i));
67 let result = json
68 .map(|json| jsonb::from_slice(json).map(|x| x.to_string()))
69 .transpose()
70 .map_err(|e| DataFusionError::Execution(format!("invalid json binary: {e}")))?;
71
72 builder.append_option(result.as_deref());
73 }
74
75 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
76 }
77}
78
79impl Display for JsonToStringFunction {
80 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
81 write!(f, "JSON_TO_STRING")
82 }
83}
84
85#[cfg(test)]
86mod tests {
87 use std::sync::Arc;
88
89 use arrow_schema::Field;
90 use datafusion_common::arrow::array::BinaryArray;
91
92 use super::*;
93
94 #[test]
95 fn test_json_to_string_function() {
96 let json_to_string = JsonToStringFunction::default();
97
98 assert_eq!("json_to_string", json_to_string.name());
99 assert_eq!(
100 DataType::Utf8View,
101 json_to_string.return_type(&[DataType::Binary]).unwrap()
102 );
103
104 let json_strings = [
105 r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
106 r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
107 r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
108 ];
109
110 let jsonbs = json_strings
111 .iter()
112 .map(|s| {
113 let value = jsonb::parse_value(s.as_bytes()).unwrap();
114 value.to_vec()
115 })
116 .collect::<Vec<_>>();
117
118 let args = ScalarFunctionArgs {
119 args: vec![ColumnarValue::Array(Arc::new(
120 BinaryArray::from_iter_values(jsonbs),
121 ))],
122 arg_fields: vec![],
123 number_rows: 3,
124 return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
125 config_options: Arc::new(Default::default()),
126 };
127 let result = json_to_string
128 .invoke_with_args(args)
129 .and_then(|x| x.to_array(1))
130 .unwrap();
131 let vector = result.as_string_view();
132
133 assert_eq!(3, vector.len());
134 for (i, gt) in json_strings.iter().enumerate() {
135 let result = vector.value(i);
136 assert_eq!(gt.replace(" ", ""), result);
138 }
139
140 let invalid_jsonb = vec![b"invalid json"];
141 let args = ScalarFunctionArgs {
142 args: vec![ColumnarValue::Array(Arc::new(
143 BinaryArray::from_iter_values(invalid_jsonb),
144 ))],
145 arg_fields: vec![],
146 number_rows: 1,
147 return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
148 config_options: Arc::new(Default::default()),
149 };
150 let vector = json_to_string.invoke_with_args(args);
151 assert!(vector.is_err());
152 }
153}