1use std::fmt::{self, Display};
16
17use common_query::error::{InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu};
18use common_query::prelude::Signature;
19use datafusion::logical_expr::Volatility;
20use datatypes::data_type::ConcreteDataType;
21use datatypes::prelude::VectorRef;
22use datatypes::scalars::ScalarVectorBuilder;
23use datatypes::vectors::{
24 BooleanVectorBuilder, Float64VectorBuilder, Int64VectorBuilder, MutableVector,
25 StringVectorBuilder,
26};
27use snafu::ensure;
28
29use crate::function::{Function, FunctionContext};
30
31fn get_json_by_path(json: &[u8], path: &str) -> Option<Vec<u8>> {
32 let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes());
33 match json_path {
34 Ok(json_path) => {
35 let mut sub_jsonb = Vec::new();
36 let mut sub_offsets = Vec::new();
37 match jsonb::get_by_path(json, json_path, &mut sub_jsonb, &mut sub_offsets) {
38 Ok(_) => Some(sub_jsonb),
39 Err(_) => None,
40 }
41 }
42 _ => None,
43 }
44}
45
46macro_rules! json_get {
49 ($name:ident, $type:ident, $rust_type:ident, $doc:expr) => {
51 paste::paste! {
52 #[doc = $doc]
53 #[derive(Clone, Debug, Default)]
54 pub struct $name;
55
56 impl Function for $name {
57 fn name(&self) -> &str {
58 stringify!([<$name:snake>])
59 }
60
61 fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
62 Ok(ConcreteDataType::[<$type:snake _datatype>]())
63 }
64
65 fn signature(&self) -> Signature {
66 Signature::exact(
67 vec![
68 ConcreteDataType::json_datatype(),
69 ConcreteDataType::string_datatype(),
70 ],
71 Volatility::Immutable,
72 )
73 }
74
75 fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
76 ensure!(
77 columns.len() == 2,
78 InvalidFuncArgsSnafu {
79 err_msg: format!(
80 "The length of the args is not correct, expect exactly two, have: {}",
81 columns.len()
82 ),
83 }
84 );
85 let jsons = &columns[0];
86 let paths = &columns[1];
87
88 let size = jsons.len();
89 let datatype = jsons.data_type();
90 let mut results = [<$type VectorBuilder>]::with_capacity(size);
91
92 match datatype {
93 ConcreteDataType::Binary(_) => {
95 for i in 0..size {
96 let json = jsons.get_ref(i);
97 let path = paths.get_ref(i);
98
99 let json = json.as_binary();
100 let path = path.as_string();
101 let result = match (json, path) {
102 (Ok(Some(json)), Ok(Some(path))) => {
103 get_json_by_path(json, path)
104 .and_then(|json| { jsonb::[<to_ $rust_type>](&json).ok() })
105 }
106 _ => None,
107 };
108
109 results.push(result);
110 }
111 }
112 _ => {
113 return UnsupportedInputDataTypeSnafu {
114 function: stringify!([<$name:snake>]),
115 datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
116 }
117 .fail();
118 }
119 }
120
121 Ok(results.to_vector())
122 }
123 }
124
125 impl Display for $name {
126 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
127 write!(f, "{}", stringify!([<$name:snake>]).to_ascii_uppercase())
128 }
129 }
130 }
131 };
132}
133
134json_get!(
135 JsonGetInt,
136 Int64,
137 i64,
138 "Get the value from the JSONB by the given path and return it as an integer."
139);
140
141json_get!(
142 JsonGetFloat,
143 Float64,
144 f64,
145 "Get the value from the JSONB by the given path and return it as a float."
146);
147
148json_get!(
149 JsonGetBool,
150 Boolean,
151 bool,
152 "Get the value from the JSONB by the given path and return it as a boolean."
153);
154
155#[derive(Clone, Debug, Default)]
157pub struct JsonGetString;
158
159impl Function for JsonGetString {
160 fn name(&self) -> &str {
161 "json_get_string"
162 }
163
164 fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
165 Ok(ConcreteDataType::string_datatype())
166 }
167
168 fn signature(&self) -> Signature {
169 Signature::exact(
170 vec![
171 ConcreteDataType::json_datatype(),
172 ConcreteDataType::string_datatype(),
173 ],
174 Volatility::Immutable,
175 )
176 }
177
178 fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
179 ensure!(
180 columns.len() == 2,
181 InvalidFuncArgsSnafu {
182 err_msg: format!(
183 "The length of the args is not correct, expect exactly two, have: {}",
184 columns.len()
185 ),
186 }
187 );
188 let jsons = &columns[0];
189 let paths = &columns[1];
190
191 let size = jsons.len();
192 let datatype = jsons.data_type();
193 let mut results = StringVectorBuilder::with_capacity(size);
194
195 match datatype {
196 ConcreteDataType::Binary(_) => {
198 for i in 0..size {
199 let json = jsons.get_ref(i);
200 let path = paths.get_ref(i);
201
202 let json = json.as_binary();
203 let path = path.as_string();
204 let result = match (json, path) {
205 (Ok(Some(json)), Ok(Some(path))) => {
206 get_json_by_path(json, path).and_then(|json| jsonb::to_str(&json).ok())
207 }
208 _ => None,
209 };
210
211 results.push(result.as_deref());
212 }
213 }
214 _ => {
215 return UnsupportedInputDataTypeSnafu {
216 function: "json_get_string",
217 datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
218 }
219 .fail();
220 }
221 }
222
223 Ok(results.to_vector())
224 }
225}
226
227impl Display for JsonGetString {
228 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
229 write!(f, "{}", "json_get_string".to_ascii_uppercase())
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use std::sync::Arc;
236
237 use common_query::prelude::TypeSignature;
238 use datatypes::scalars::ScalarVector;
239 use datatypes::vectors::{BinaryVector, StringVector};
240
241 use super::*;
242
243 #[test]
244 fn test_json_get_int() {
245 let json_get_int = JsonGetInt;
246
247 assert_eq!("json_get_int", json_get_int.name());
248 assert_eq!(
249 ConcreteDataType::int64_datatype(),
250 json_get_int
251 .return_type(&[
252 ConcreteDataType::json_datatype(),
253 ConcreteDataType::string_datatype()
254 ])
255 .unwrap()
256 );
257
258 assert!(matches!(json_get_int.signature(),
259 Signature {
260 type_signature: TypeSignature::Exact(valid_types),
261 volatility: Volatility::Immutable
262 } if valid_types == vec![ConcreteDataType::json_datatype(), ConcreteDataType::string_datatype()]
263 ));
264
265 let json_strings = [
266 r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
267 r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
268 r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
269 ];
270 let paths = vec!["$.a.b", "$.a", "$.c"];
271 let results = [Some(2), Some(4), None];
272
273 let jsonbs = json_strings
274 .iter()
275 .map(|s| {
276 let value = jsonb::parse_value(s.as_bytes()).unwrap();
277 value.to_vec()
278 })
279 .collect::<Vec<_>>();
280
281 let json_vector = BinaryVector::from_vec(jsonbs);
282 let path_vector = StringVector::from_vec(paths);
283 let args: Vec<VectorRef> = vec![Arc::new(json_vector), Arc::new(path_vector)];
284 let vector = json_get_int
285 .eval(&FunctionContext::default(), &args)
286 .unwrap();
287
288 assert_eq!(3, vector.len());
289 for (i, gt) in results.iter().enumerate() {
290 let result = vector.get_ref(i);
291 let result = result.as_i64().unwrap();
292 assert_eq!(*gt, result);
293 }
294 }
295
296 #[test]
297 fn test_json_get_float() {
298 let json_get_float = JsonGetFloat;
299
300 assert_eq!("json_get_float", json_get_float.name());
301 assert_eq!(
302 ConcreteDataType::float64_datatype(),
303 json_get_float
304 .return_type(&[
305 ConcreteDataType::json_datatype(),
306 ConcreteDataType::string_datatype()
307 ])
308 .unwrap()
309 );
310
311 assert!(matches!(json_get_float.signature(),
312 Signature {
313 type_signature: TypeSignature::Exact(valid_types),
314 volatility: Volatility::Immutable
315 } if valid_types == vec![ConcreteDataType::json_datatype(), ConcreteDataType::string_datatype()]
316 ));
317
318 let json_strings = [
319 r#"{"a": {"b": 2.1}, "b": 2.2, "c": 3.3}"#,
320 r#"{"a": 4.4, "b": {"c": 6.6}, "c": 6.6}"#,
321 r#"{"a": 7.7, "b": 8.8, "c": {"a": 7.7}}"#,
322 ];
323 let paths = vec!["$.a.b", "$.a", "$.c"];
324 let results = [Some(2.1), Some(4.4), None];
325
326 let jsonbs = json_strings
327 .iter()
328 .map(|s| {
329 let value = jsonb::parse_value(s.as_bytes()).unwrap();
330 value.to_vec()
331 })
332 .collect::<Vec<_>>();
333
334 let json_vector = BinaryVector::from_vec(jsonbs);
335 let path_vector = StringVector::from_vec(paths);
336 let args: Vec<VectorRef> = vec![Arc::new(json_vector), Arc::new(path_vector)];
337 let vector = json_get_float
338 .eval(&FunctionContext::default(), &args)
339 .unwrap();
340
341 assert_eq!(3, vector.len());
342 for (i, gt) in results.iter().enumerate() {
343 let result = vector.get_ref(i);
344 let result = result.as_f64().unwrap();
345 assert_eq!(*gt, result);
346 }
347 }
348
349 #[test]
350 fn test_json_get_bool() {
351 let json_get_bool = JsonGetBool;
352
353 assert_eq!("json_get_bool", json_get_bool.name());
354 assert_eq!(
355 ConcreteDataType::boolean_datatype(),
356 json_get_bool
357 .return_type(&[
358 ConcreteDataType::json_datatype(),
359 ConcreteDataType::string_datatype()
360 ])
361 .unwrap()
362 );
363
364 assert!(matches!(json_get_bool.signature(),
365 Signature {
366 type_signature: TypeSignature::Exact(valid_types),
367 volatility: Volatility::Immutable
368 } if valid_types == vec![ConcreteDataType::json_datatype(), ConcreteDataType::string_datatype()]
369 ));
370
371 let json_strings = [
372 r#"{"a": {"b": true}, "b": false, "c": true}"#,
373 r#"{"a": false, "b": {"c": true}, "c": false}"#,
374 r#"{"a": true, "b": false, "c": {"a": true}}"#,
375 ];
376 let paths = vec!["$.a.b", "$.a", "$.c"];
377 let results = [Some(true), Some(false), None];
378
379 let jsonbs = json_strings
380 .iter()
381 .map(|s| {
382 let value = jsonb::parse_value(s.as_bytes()).unwrap();
383 value.to_vec()
384 })
385 .collect::<Vec<_>>();
386
387 let json_vector = BinaryVector::from_vec(jsonbs);
388 let path_vector = StringVector::from_vec(paths);
389 let args: Vec<VectorRef> = vec![Arc::new(json_vector), Arc::new(path_vector)];
390 let vector = json_get_bool
391 .eval(&FunctionContext::default(), &args)
392 .unwrap();
393
394 assert_eq!(3, vector.len());
395 for (i, gt) in results.iter().enumerate() {
396 let result = vector.get_ref(i);
397 let result = result.as_boolean().unwrap();
398 assert_eq!(*gt, result);
399 }
400 }
401
402 #[test]
403 fn test_json_get_string() {
404 let json_get_string = JsonGetString;
405
406 assert_eq!("json_get_string", json_get_string.name());
407 assert_eq!(
408 ConcreteDataType::string_datatype(),
409 json_get_string
410 .return_type(&[
411 ConcreteDataType::json_datatype(),
412 ConcreteDataType::string_datatype()
413 ])
414 .unwrap()
415 );
416
417 assert!(matches!(json_get_string.signature(),
418 Signature {
419 type_signature: TypeSignature::Exact(valid_types),
420 volatility: Volatility::Immutable
421 } if valid_types == vec![ConcreteDataType::json_datatype(), ConcreteDataType::string_datatype()]
422 ));
423
424 let json_strings = [
425 r#"{"a": {"b": "a"}, "b": "b", "c": "c"}"#,
426 r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
427 r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#,
428 ];
429 let paths = vec!["$.a.b", "$.a", ""];
430 let results = [Some("a"), Some("d"), None];
431
432 let jsonbs = json_strings
433 .iter()
434 .map(|s| {
435 let value = jsonb::parse_value(s.as_bytes()).unwrap();
436 value.to_vec()
437 })
438 .collect::<Vec<_>>();
439
440 let json_vector = BinaryVector::from_vec(jsonbs);
441 let path_vector = StringVector::from_vec(paths);
442 let args: Vec<VectorRef> = vec![Arc::new(json_vector), Arc::new(path_vector)];
443 let vector = json_get_string
444 .eval(&FunctionContext::default(), &args)
445 .unwrap();
446
447 assert_eq!(3, vector.len());
448 for (i, gt) in results.iter().enumerate() {
449 let result = vector.get_ref(i);
450 let result = result.as_string().unwrap();
451 assert_eq!(*gt, result);
452 }
453 }
454}