common_function/scalars/json/
parse_json.rs1use std::fmt::{self, Display};
16
17use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
18use datafusion::arrow::datatypes::DataType;
19use datafusion_expr::{Signature, Volatility};
20use datatypes::data_type::ConcreteDataType;
21use datatypes::prelude::VectorRef;
22use datatypes::scalars::ScalarVectorBuilder;
23use datatypes::vectors::{BinaryVectorBuilder, MutableVector};
24use snafu::ensure;
25
26use crate::function::{Function, FunctionContext};
27
28#[derive(Clone, Debug, Default)]
30pub struct ParseJsonFunction;
31
32const NAME: &str = "parse_json";
33
34impl Function for ParseJsonFunction {
35 fn name(&self) -> &str {
36 NAME
37 }
38
39 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
40 Ok(DataType::Binary)
41 }
42
43 fn signature(&self) -> Signature {
44 Signature::string(1, Volatility::Immutable)
45 }
46
47 fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
48 ensure!(
49 columns.len() == 1,
50 InvalidFuncArgsSnafu {
51 err_msg: format!(
52 "The length of the args is not correct, expect exactly one, have: {}",
53 columns.len()
54 ),
55 }
56 );
57 let json_strings = &columns[0];
58
59 let size = json_strings.len();
60 let datatype = json_strings.data_type();
61 let mut results = BinaryVectorBuilder::with_capacity(size);
62
63 match datatype {
64 ConcreteDataType::String(_) => {
65 for i in 0..size {
66 let json_string = json_strings.get_ref(i);
67
68 let json_string = json_string.as_string();
69 let result = match json_string {
70 Ok(Some(json_string)) => match jsonb::parse_value(json_string.as_bytes()) {
71 Ok(json) => Some(json.to_vec()),
72 Err(_) => {
73 return InvalidFuncArgsSnafu {
74 err_msg: format!(
75 "Cannot convert the string to json, have: {}",
76 json_string
77 ),
78 }
79 .fail();
80 }
81 },
82 _ => None,
83 };
84
85 results.push(result.as_deref());
86 }
87 }
88 _ => {
89 return UnsupportedInputDataTypeSnafu {
90 function: NAME,
91 datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
92 }
93 .fail();
94 }
95 }
96
97 Ok(results.to_vector())
98 }
99}
100
101impl Display for ParseJsonFunction {
102 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
103 write!(f, "PARSE_JSON")
104 }
105}
106
107#[cfg(test)]
108mod tests {
109 use std::sync::Arc;
110
111 use datatypes::scalars::ScalarVector;
112 use datatypes::vectors::StringVector;
113
114 use super::*;
115
116 #[test]
117 fn test_get_by_path_function() {
118 let parse_json = ParseJsonFunction;
119
120 assert_eq!("parse_json", parse_json.name());
121 assert_eq!(
122 DataType::Binary,
123 parse_json.return_type(&[DataType::Binary]).unwrap()
124 );
125
126 let json_strings = [
127 r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
128 r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
129 r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
130 ];
131
132 let jsonbs = json_strings
133 .iter()
134 .map(|s| {
135 let value = jsonb::parse_value(s.as_bytes()).unwrap();
136 value.to_vec()
137 })
138 .collect::<Vec<_>>();
139
140 let json_string_vector = StringVector::from_vec(json_strings.to_vec());
141 let args: Vec<VectorRef> = vec![Arc::new(json_string_vector)];
142 let vector = parse_json.eval(&FunctionContext::default(), &args).unwrap();
143
144 assert_eq!(3, vector.len());
145 for (i, gt) in jsonbs.iter().enumerate() {
146 let result = vector.get_ref(i);
147 let result = result.as_binary().unwrap().unwrap();
148 assert_eq!(gt, result);
149 }
150 }
151}