1use std::fmt::{self, Display};
16
17use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
18use datafusion_expr::{Signature, Volatility};
19use datatypes::arrow::datatypes::DataType;
20use datatypes::data_type::ConcreteDataType;
21use datatypes::prelude::VectorRef;
22use datatypes::scalars::ScalarVectorBuilder;
23use datatypes::vectors::{
24 BooleanVectorBuilder, Float64VectorBuilder, Int64VectorBuilder, MutableVector,
25 StringVectorBuilder,
26};
27use snafu::ensure;
28
29use crate::function::{Function, FunctionContext};
30
31fn get_json_by_path(json: &[u8], path: &str) -> Option<Vec<u8>> {
32 let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes());
33 match json_path {
34 Ok(json_path) => {
35 let mut sub_jsonb = Vec::new();
36 let mut sub_offsets = Vec::new();
37 match jsonb::get_by_path(json, json_path, &mut sub_jsonb, &mut sub_offsets) {
38 Ok(_) => Some(sub_jsonb),
39 Err(_) => None,
40 }
41 }
42 _ => None,
43 }
44}
45
46macro_rules! json_get {
49 ($name:ident, $type:ident, $rust_type:ident, $doc:expr) => {
51 paste::paste! {
52 #[doc = $doc]
53 #[derive(Clone, Debug, Default)]
54 pub struct $name;
55
56 impl Function for $name {
57 fn name(&self) -> &str {
58 stringify!([<$name:snake>])
59 }
60
61 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
62 Ok(DataType::[<$type>])
63 }
64
65 fn signature(&self) -> Signature {
66 Signature::exact(
68 vec![DataType::Binary, DataType::Utf8],
69 Volatility::Immutable,
70 )
71 }
72
73 fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
74 ensure!(
75 columns.len() == 2,
76 InvalidFuncArgsSnafu {
77 err_msg: format!(
78 "The length of the args is not correct, expect exactly two, have: {}",
79 columns.len()
80 ),
81 }
82 );
83 let jsons = &columns[0];
84 let paths = &columns[1];
85
86 let size = jsons.len();
87 let datatype = jsons.data_type();
88 let mut results = [<$type VectorBuilder>]::with_capacity(size);
89
90 match datatype {
91 ConcreteDataType::Binary(_) => {
93 for i in 0..size {
94 let json = jsons.get_ref(i);
95 let path = paths.get_ref(i);
96
97 let json = json.as_binary();
98 let path = path.as_string();
99 let result = match (json, path) {
100 (Ok(Some(json)), Ok(Some(path))) => {
101 get_json_by_path(json, path)
102 .and_then(|json| { jsonb::[<to_ $rust_type>](&json).ok() })
103 }
104 _ => None,
105 };
106
107 results.push(result);
108 }
109 }
110 _ => {
111 return UnsupportedInputDataTypeSnafu {
112 function: stringify!([<$name:snake>]),
113 datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
114 }
115 .fail();
116 }
117 }
118
119 Ok(results.to_vector())
120 }
121 }
122
123 impl Display for $name {
124 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
125 write!(f, "{}", stringify!([<$name:snake>]).to_ascii_uppercase())
126 }
127 }
128 }
129 };
130}
131
132json_get!(
133 JsonGetInt,
134 Int64,
135 i64,
136 "Get the value from the JSONB by the given path and return it as an integer."
137);
138
139json_get!(
140 JsonGetFloat,
141 Float64,
142 f64,
143 "Get the value from the JSONB by the given path and return it as a float."
144);
145
146json_get!(
147 JsonGetBool,
148 Boolean,
149 bool,
150 "Get the value from the JSONB by the given path and return it as a boolean."
151);
152
153#[derive(Clone, Debug, Default)]
155pub struct JsonGetString;
156
157impl Function for JsonGetString {
158 fn name(&self) -> &str {
159 "json_get_string"
160 }
161
162 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
163 Ok(DataType::Utf8)
164 }
165
166 fn signature(&self) -> Signature {
167 Signature::exact(
169 vec![DataType::Binary, DataType::Utf8],
170 Volatility::Immutable,
171 )
172 }
173
174 fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
175 ensure!(
176 columns.len() == 2,
177 InvalidFuncArgsSnafu {
178 err_msg: format!(
179 "The length of the args is not correct, expect exactly two, have: {}",
180 columns.len()
181 ),
182 }
183 );
184 let jsons = &columns[0];
185 let paths = &columns[1];
186
187 let size = jsons.len();
188 let datatype = jsons.data_type();
189 let mut results = StringVectorBuilder::with_capacity(size);
190
191 match datatype {
192 ConcreteDataType::Binary(_) => {
194 for i in 0..size {
195 let json = jsons.get_ref(i);
196 let path = paths.get_ref(i);
197
198 let json = json.as_binary();
199 let path = path.as_string();
200 let result = match (json, path) {
201 (Ok(Some(json)), Ok(Some(path))) => {
202 get_json_by_path(json, path).and_then(|json| jsonb::to_str(&json).ok())
203 }
204 _ => None,
205 };
206
207 results.push(result.as_deref());
208 }
209 }
210 _ => {
211 return UnsupportedInputDataTypeSnafu {
212 function: "json_get_string",
213 datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
214 }
215 .fail();
216 }
217 }
218
219 Ok(results.to_vector())
220 }
221}
222
223impl Display for JsonGetString {
224 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
225 write!(f, "{}", "json_get_string".to_ascii_uppercase())
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use std::sync::Arc;
232
233 use datafusion_expr::TypeSignature;
234 use datatypes::scalars::ScalarVector;
235 use datatypes::vectors::{BinaryVector, StringVector};
236
237 use super::*;
238
239 #[test]
240 fn test_json_get_int() {
241 let json_get_int = JsonGetInt;
242
243 assert_eq!("json_get_int", json_get_int.name());
244 assert_eq!(
245 DataType::Int64,
246 json_get_int
247 .return_type(&[DataType::Binary, DataType::Utf8])
248 .unwrap()
249 );
250
251 assert!(matches!(json_get_int.signature(),
252 Signature {
253 type_signature: TypeSignature::Exact(valid_types),
254 volatility: Volatility::Immutable
255 } if valid_types == vec![DataType::Binary, DataType::Utf8]
256 ));
257
258 let json_strings = [
259 r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
260 r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
261 r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
262 ];
263 let paths = vec!["$.a.b", "$.a", "$.c"];
264 let results = [Some(2), Some(4), None];
265
266 let jsonbs = json_strings
267 .iter()
268 .map(|s| {
269 let value = jsonb::parse_value(s.as_bytes()).unwrap();
270 value.to_vec()
271 })
272 .collect::<Vec<_>>();
273
274 let json_vector = BinaryVector::from_vec(jsonbs);
275 let path_vector = StringVector::from_vec(paths);
276 let args: Vec<VectorRef> = vec![Arc::new(json_vector), Arc::new(path_vector)];
277 let vector = json_get_int
278 .eval(&FunctionContext::default(), &args)
279 .unwrap();
280
281 assert_eq!(3, vector.len());
282 for (i, gt) in results.iter().enumerate() {
283 let result = vector.get_ref(i);
284 let result = result.as_i64().unwrap();
285 assert_eq!(*gt, result);
286 }
287 }
288
289 #[test]
290 fn test_json_get_float() {
291 let json_get_float = JsonGetFloat;
292
293 assert_eq!("json_get_float", json_get_float.name());
294 assert_eq!(
295 DataType::Float64,
296 json_get_float
297 .return_type(&[DataType::Binary, DataType::Utf8])
298 .unwrap()
299 );
300
301 assert!(matches!(json_get_float.signature(),
302 Signature {
303 type_signature: TypeSignature::Exact(valid_types),
304 volatility: Volatility::Immutable
305 } if valid_types == vec![DataType::Binary, DataType::Utf8]
306 ));
307
308 let json_strings = [
309 r#"{"a": {"b": 2.1}, "b": 2.2, "c": 3.3}"#,
310 r#"{"a": 4.4, "b": {"c": 6.6}, "c": 6.6}"#,
311 r#"{"a": 7.7, "b": 8.8, "c": {"a": 7.7}}"#,
312 ];
313 let paths = vec!["$.a.b", "$.a", "$.c"];
314 let results = [Some(2.1), Some(4.4), None];
315
316 let jsonbs = json_strings
317 .iter()
318 .map(|s| {
319 let value = jsonb::parse_value(s.as_bytes()).unwrap();
320 value.to_vec()
321 })
322 .collect::<Vec<_>>();
323
324 let json_vector = BinaryVector::from_vec(jsonbs);
325 let path_vector = StringVector::from_vec(paths);
326 let args: Vec<VectorRef> = vec![Arc::new(json_vector), Arc::new(path_vector)];
327 let vector = json_get_float
328 .eval(&FunctionContext::default(), &args)
329 .unwrap();
330
331 assert_eq!(3, vector.len());
332 for (i, gt) in results.iter().enumerate() {
333 let result = vector.get_ref(i);
334 let result = result.as_f64().unwrap();
335 assert_eq!(*gt, result);
336 }
337 }
338
339 #[test]
340 fn test_json_get_bool() {
341 let json_get_bool = JsonGetBool;
342
343 assert_eq!("json_get_bool", json_get_bool.name());
344 assert_eq!(
345 DataType::Boolean,
346 json_get_bool
347 .return_type(&[DataType::Binary, DataType::Utf8])
348 .unwrap()
349 );
350
351 assert!(matches!(json_get_bool.signature(),
352 Signature {
353 type_signature: TypeSignature::Exact(valid_types),
354 volatility: Volatility::Immutable
355 } if valid_types == vec![DataType::Binary, DataType::Utf8]
356 ));
357
358 let json_strings = [
359 r#"{"a": {"b": true}, "b": false, "c": true}"#,
360 r#"{"a": false, "b": {"c": true}, "c": false}"#,
361 r#"{"a": true, "b": false, "c": {"a": true}}"#,
362 ];
363 let paths = vec!["$.a.b", "$.a", "$.c"];
364 let results = [Some(true), Some(false), None];
365
366 let jsonbs = json_strings
367 .iter()
368 .map(|s| {
369 let value = jsonb::parse_value(s.as_bytes()).unwrap();
370 value.to_vec()
371 })
372 .collect::<Vec<_>>();
373
374 let json_vector = BinaryVector::from_vec(jsonbs);
375 let path_vector = StringVector::from_vec(paths);
376 let args: Vec<VectorRef> = vec![Arc::new(json_vector), Arc::new(path_vector)];
377 let vector = json_get_bool
378 .eval(&FunctionContext::default(), &args)
379 .unwrap();
380
381 assert_eq!(3, vector.len());
382 for (i, gt) in results.iter().enumerate() {
383 let result = vector.get_ref(i);
384 let result = result.as_boolean().unwrap();
385 assert_eq!(*gt, result);
386 }
387 }
388
389 #[test]
390 fn test_json_get_string() {
391 let json_get_string = JsonGetString;
392
393 assert_eq!("json_get_string", json_get_string.name());
394 assert_eq!(
395 DataType::Utf8,
396 json_get_string
397 .return_type(&[DataType::Binary, DataType::Utf8])
398 .unwrap()
399 );
400
401 assert!(matches!(json_get_string.signature(),
402 Signature {
403 type_signature: TypeSignature::Exact(valid_types),
404 volatility: Volatility::Immutable
405 } if valid_types == vec![DataType::Binary, DataType::Utf8]
406 ));
407
408 let json_strings = [
409 r#"{"a": {"b": "a"}, "b": "b", "c": "c"}"#,
410 r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
411 r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#,
412 ];
413 let paths = vec!["$.a.b", "$.a", ""];
414 let results = [Some("a"), Some("d"), None];
415
416 let jsonbs = json_strings
417 .iter()
418 .map(|s| {
419 let value = jsonb::parse_value(s.as_bytes()).unwrap();
420 value.to_vec()
421 })
422 .collect::<Vec<_>>();
423
424 let json_vector = BinaryVector::from_vec(jsonbs);
425 let path_vector = StringVector::from_vec(paths);
426 let args: Vec<VectorRef> = vec![Arc::new(json_vector), Arc::new(path_vector)];
427 let vector = json_get_string
428 .eval(&FunctionContext::default(), &args)
429 .unwrap();
430
431 assert_eq!(3, vector.len());
432 for (i, gt) in results.iter().enumerate() {
433 let result = vector.get_ref(i);
434 let result = result.as_string().unwrap();
435 assert_eq!(*gt, result);
436 }
437 }
438}