common_function/scalars/json/
parse_json.rs1use std::fmt::{self, Display};
16
17use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
18use common_query::prelude::Signature;
19use datafusion::logical_expr::Volatility;
20use datatypes::data_type::ConcreteDataType;
21use datatypes::prelude::VectorRef;
22use datatypes::scalars::ScalarVectorBuilder;
23use datatypes::vectors::{BinaryVectorBuilder, MutableVector};
24use snafu::ensure;
25
26use crate::function::{Function, FunctionContext};
27
28#[derive(Clone, Debug, Default)]
30pub struct ParseJsonFunction;
31
32const NAME: &str = "parse_json";
33
34impl Function for ParseJsonFunction {
35 fn name(&self) -> &str {
36 NAME
37 }
38
39 fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
40 Ok(ConcreteDataType::json_datatype())
41 }
42
43 fn signature(&self) -> Signature {
44 Signature::exact(
45 vec![ConcreteDataType::string_datatype()],
46 Volatility::Immutable,
47 )
48 }
49
50 fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
51 ensure!(
52 columns.len() == 1,
53 InvalidFuncArgsSnafu {
54 err_msg: format!(
55 "The length of the args is not correct, expect exactly one, have: {}",
56 columns.len()
57 ),
58 }
59 );
60 let json_strings = &columns[0];
61
62 let size = json_strings.len();
63 let datatype = json_strings.data_type();
64 let mut results = BinaryVectorBuilder::with_capacity(size);
65
66 match datatype {
67 ConcreteDataType::String(_) => {
68 for i in 0..size {
69 let json_string = json_strings.get_ref(i);
70
71 let json_string = json_string.as_string();
72 let result = match json_string {
73 Ok(Some(json_string)) => match jsonb::parse_value(json_string.as_bytes()) {
74 Ok(json) => Some(json.to_vec()),
75 Err(_) => {
76 return InvalidFuncArgsSnafu {
77 err_msg: format!(
78 "Cannot convert the string to json, have: {}",
79 json_string
80 ),
81 }
82 .fail()
83 }
84 },
85 _ => None,
86 };
87
88 results.push(result.as_deref());
89 }
90 }
91 _ => {
92 return UnsupportedInputDataTypeSnafu {
93 function: NAME,
94 datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
95 }
96 .fail();
97 }
98 }
99
100 Ok(results.to_vector())
101 }
102}
103
104impl Display for ParseJsonFunction {
105 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
106 write!(f, "PARSE_JSON")
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use std::sync::Arc;
113
114 use common_query::prelude::TypeSignature;
115 use datatypes::scalars::ScalarVector;
116 use datatypes::vectors::StringVector;
117
118 use super::*;
119
120 #[test]
121 fn test_get_by_path_function() {
122 let parse_json = ParseJsonFunction;
123
124 assert_eq!("parse_json", parse_json.name());
125 assert_eq!(
126 ConcreteDataType::json_datatype(),
127 parse_json
128 .return_type(&[ConcreteDataType::json_datatype()])
129 .unwrap()
130 );
131
132 assert!(matches!(parse_json.signature(),
133 Signature {
134 type_signature: TypeSignature::Exact(valid_types),
135 volatility: Volatility::Immutable
136 } if valid_types == vec![ConcreteDataType::string_datatype()]
137 ));
138
139 let json_strings = [
140 r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
141 r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
142 r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
143 ];
144
145 let jsonbs = json_strings
146 .iter()
147 .map(|s| {
148 let value = jsonb::parse_value(s.as_bytes()).unwrap();
149 value.to_vec()
150 })
151 .collect::<Vec<_>>();
152
153 let json_string_vector = StringVector::from_vec(json_strings.to_vec());
154 let args: Vec<VectorRef> = vec![Arc::new(json_string_vector)];
155 let vector = parse_json.eval(&FunctionContext::default(), &args).unwrap();
156
157 assert_eq!(3, vector.len());
158 for (i, gt) in jsonbs.iter().enumerate() {
159 let result = vector.get_ref(i);
160 let result = result.as_binary().unwrap().unwrap();
161 assert_eq!(gt, result);
162 }
163 }
164}