common_function/scalars/json/
json_get.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{self, Display};
16use std::sync::Arc;
17
18use arrow::compute;
19use datafusion_common::DataFusionError;
20use datafusion_common::arrow::array::{
21    Array, AsArray, BinaryViewBuilder, BooleanBuilder, Float64Builder, Int64Builder,
22    StringViewBuilder,
23};
24use datafusion_common::arrow::datatypes::DataType;
25use datafusion_expr::type_coercion::aggregates::STRINGS;
26use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature};
27
28use crate::function::{Function, extract_args};
29use crate::helper;
30
31fn get_json_by_path(json: &[u8], path: &str) -> Option<Vec<u8>> {
32    let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes());
33    match json_path {
34        Ok(json_path) => {
35            let mut sub_jsonb = Vec::new();
36            let mut sub_offsets = Vec::new();
37            match jsonb::get_by_path(json, json_path, &mut sub_jsonb, &mut sub_offsets) {
38                Ok(_) => Some(sub_jsonb),
39                Err(_) => None,
40            }
41        }
42        _ => None,
43    }
44}
45
46/// Get the value from the JSONB by the given path and return it as specified type.
47/// If the path does not exist or the value is not the type specified, return `NULL`.
48macro_rules! json_get {
49    // e.g. name = JsonGetInt, type = Int64, rust_type = i64, doc = "Get the value from the JSONB by the given path and return it as an integer."
50    ($name:ident, $type:ident, $rust_type:ident, $doc:expr) => {
51        paste::paste! {
52            #[doc = $doc]
53            #[derive(Clone, Debug)]
54            pub struct $name {
55                signature: Signature,
56            }
57
58            impl $name {
59                pub const NAME: &'static str = stringify!([<$name:snake>]);
60            }
61
62            impl Default for $name {
63                fn default() -> Self {
64                    Self {
65                        // TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type.
66                        signature: helper::one_of_sigs2(
67                            vec![DataType::Binary, DataType::BinaryView],
68                            vec![DataType::Utf8, DataType::Utf8View],
69                        ),
70                    }
71                }
72            }
73
74            impl Function for $name {
75                fn name(&self) -> &str {
76                    Self::NAME
77                }
78
79                fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
80                    Ok(DataType::[<$type>])
81                }
82
83                fn signature(&self) -> &Signature {
84                    &self.signature
85                }
86
87                fn invoke_with_args(
88                    &self,
89                    args: ScalarFunctionArgs,
90                ) -> datafusion_common::Result<ColumnarValue> {
91                    let [arg0, arg1] = extract_args(self.name(), &args)?;
92                    let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
93                    let jsons = arg0.as_binary_view();
94                    let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
95                    let paths = arg1.as_string_view();
96
97                    let size = jsons.len();
98                    let mut builder = [<$type Builder>]::with_capacity(size);
99
100                    for i in 0..size {
101                        let json = jsons.is_valid(i).then(|| jsons.value(i));
102                        let path = paths.is_valid(i).then(|| paths.value(i));
103                        let result = match (json, path) {
104                            (Some(json), Some(path)) => {
105                                get_json_by_path(json, path)
106                                    .and_then(|json| { jsonb::[<to_ $rust_type>](&json).ok() })
107                            }
108                            _ => None,
109                        };
110
111                        builder.append_option(result);
112                    }
113
114                    Ok(ColumnarValue::Array(Arc::new(builder.finish())))
115                }
116            }
117
118            impl Display for $name {
119                fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
120                    write!(f, "{}", Self::NAME.to_ascii_uppercase())
121                }
122            }
123        }
124    };
125}
126
127json_get!(
128    JsonGetInt,
129    Int64,
130    i64,
131    "Get the value from the JSONB by the given path and return it as an integer."
132);
133
134json_get!(
135    JsonGetFloat,
136    Float64,
137    f64,
138    "Get the value from the JSONB by the given path and return it as a float."
139);
140
141json_get!(
142    JsonGetBool,
143    Boolean,
144    bool,
145    "Get the value from the JSONB by the given path and return it as a boolean."
146);
147
148/// Get the value from the JSONB by the given path and return it as a string.
149#[derive(Clone, Debug)]
150pub struct JsonGetString {
151    signature: Signature,
152}
153
154impl JsonGetString {
155    pub const NAME: &'static str = "json_get_string";
156}
157
158impl Default for JsonGetString {
159    fn default() -> Self {
160        Self {
161            // TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type.
162            signature: helper::one_of_sigs2(
163                vec![DataType::Binary, DataType::BinaryView],
164                vec![DataType::Utf8, DataType::Utf8View],
165            ),
166        }
167    }
168}
169
170impl Function for JsonGetString {
171    fn name(&self) -> &str {
172        Self::NAME
173    }
174
175    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
176        Ok(DataType::Utf8View)
177    }
178
179    fn signature(&self) -> &Signature {
180        &self.signature
181    }
182
183    fn invoke_with_args(
184        &self,
185        args: ScalarFunctionArgs,
186    ) -> datafusion_common::Result<ColumnarValue> {
187        let [arg0, arg1] = extract_args(self.name(), &args)?;
188        let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
189        let jsons = arg0.as_binary_view();
190        let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
191        let paths = arg1.as_string_view();
192
193        let size = jsons.len();
194        let mut builder = StringViewBuilder::with_capacity(size);
195
196        for i in 0..size {
197            let json = jsons.is_valid(i).then(|| jsons.value(i));
198            let path = paths.is_valid(i).then(|| paths.value(i));
199            let result = match (json, path) {
200                (Some(json), Some(path)) => {
201                    get_json_by_path(json, path).and_then(|json| jsonb::to_str(&json).ok())
202                }
203                _ => None,
204            };
205            builder.append_option(result);
206        }
207
208        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
209    }
210}
211
212impl Display for JsonGetString {
213    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
214        write!(f, "{}", Self::NAME.to_ascii_uppercase())
215    }
216}
217
218/// Get the object from JSON value by path.
219pub(super) struct JsonGetObject {
220    signature: Signature,
221}
222
223impl JsonGetObject {
224    const NAME: &'static str = "json_get_object";
225}
226
227impl Default for JsonGetObject {
228    fn default() -> Self {
229        Self {
230            signature: helper::one_of_sigs2(
231                vec![
232                    DataType::Binary,
233                    DataType::LargeBinary,
234                    DataType::BinaryView,
235                ],
236                STRINGS.to_vec(),
237            ),
238        }
239    }
240}
241
242impl Function for JsonGetObject {
243    fn name(&self) -> &str {
244        Self::NAME
245    }
246
247    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
248        Ok(DataType::BinaryView)
249    }
250
251    fn signature(&self) -> &Signature {
252        &self.signature
253    }
254
255    fn invoke_with_args(
256        &self,
257        args: ScalarFunctionArgs,
258    ) -> datafusion_common::Result<ColumnarValue> {
259        let [arg0, arg1] = extract_args(self.name(), &args)?;
260        let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
261        let jsons = arg0.as_binary_view();
262        let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
263        let paths = arg1.as_string_view();
264
265        let len = jsons.len();
266        let mut builder = BinaryViewBuilder::with_capacity(len);
267
268        for i in 0..len {
269            let json = jsons.is_valid(i).then(|| jsons.value(i));
270            let path = paths.is_valid(i).then(|| paths.value(i));
271            let result = if let (Some(json), Some(path)) = (json, path) {
272                let result = jsonb::jsonpath::parse_json_path(path.as_bytes()).and_then(|path| {
273                    let mut data = Vec::new();
274                    let mut offset = Vec::new();
275                    jsonb::get_by_path(json, path, &mut data, &mut offset)
276                        .map(|()| jsonb::is_object(&data).then_some(data))
277                });
278                result.map_err(|e| DataFusionError::Execution(e.to_string()))?
279            } else {
280                None
281            };
282            builder.append_option(result);
283        }
284
285        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
286    }
287}
288
289impl Display for JsonGetObject {
290    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
291        write!(f, "{}", Self::NAME.to_ascii_uppercase())
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use std::sync::Arc;
298
299    use arrow_schema::Field;
300    use datafusion_common::ScalarValue;
301    use datafusion_common::arrow::array::{BinaryArray, BinaryViewArray, StringArray};
302    use datafusion_common::arrow::datatypes::{Float64Type, Int64Type};
303    use datatypes::types::parse_string_to_jsonb;
304
305    use super::*;
306
307    #[test]
308    fn test_json_get_int() {
309        let json_get_int = JsonGetInt::default();
310
311        assert_eq!("json_get_int", json_get_int.name());
312        assert_eq!(
313            DataType::Int64,
314            json_get_int
315                .return_type(&[DataType::Binary, DataType::Utf8])
316                .unwrap()
317        );
318
319        let json_strings = [
320            r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
321            r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
322            r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
323        ];
324        let paths = vec!["$.a.b", "$.a", "$.c"];
325        let results = [Some(2), Some(4), None];
326
327        let jsonbs = json_strings
328            .iter()
329            .map(|s| {
330                let value = jsonb::parse_value(s.as_bytes()).unwrap();
331                value.to_vec()
332            })
333            .collect::<Vec<_>>();
334
335        let args = ScalarFunctionArgs {
336            args: vec![
337                ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
338                ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
339            ],
340            arg_fields: vec![],
341            number_rows: 3,
342            return_field: Arc::new(Field::new("x", DataType::Int64, false)),
343            config_options: Arc::new(Default::default()),
344        };
345        let result = json_get_int
346            .invoke_with_args(args)
347            .and_then(|x| x.to_array(3))
348            .unwrap();
349        let vector = result.as_primitive::<Int64Type>();
350
351        assert_eq!(3, vector.len());
352        for (i, gt) in results.iter().enumerate() {
353            let result = vector.is_valid(i).then(|| vector.value(i));
354            assert_eq!(*gt, result);
355        }
356    }
357
358    #[test]
359    fn test_json_get_float() {
360        let json_get_float = JsonGetFloat::default();
361
362        assert_eq!("json_get_float", json_get_float.name());
363        assert_eq!(
364            DataType::Float64,
365            json_get_float
366                .return_type(&[DataType::Binary, DataType::Utf8])
367                .unwrap()
368        );
369
370        let json_strings = [
371            r#"{"a": {"b": 2.1}, "b": 2.2, "c": 3.3}"#,
372            r#"{"a": 4.4, "b": {"c": 6.6}, "c": 6.6}"#,
373            r#"{"a": 7.7, "b": 8.8, "c": {"a": 7.7}}"#,
374        ];
375        let paths = vec!["$.a.b", "$.a", "$.c"];
376        let results = [Some(2.1), Some(4.4), None];
377
378        let jsonbs = json_strings
379            .iter()
380            .map(|s| {
381                let value = jsonb::parse_value(s.as_bytes()).unwrap();
382                value.to_vec()
383            })
384            .collect::<Vec<_>>();
385
386        let args = ScalarFunctionArgs {
387            args: vec![
388                ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
389                ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
390            ],
391            arg_fields: vec![],
392            number_rows: 3,
393            return_field: Arc::new(Field::new("x", DataType::Float64, false)),
394            config_options: Arc::new(Default::default()),
395        };
396        let result = json_get_float
397            .invoke_with_args(args)
398            .and_then(|x| x.to_array(3))
399            .unwrap();
400        let vector = result.as_primitive::<Float64Type>();
401
402        assert_eq!(3, vector.len());
403        for (i, gt) in results.iter().enumerate() {
404            let result = vector.is_valid(i).then(|| vector.value(i));
405            assert_eq!(*gt, result);
406        }
407    }
408
409    #[test]
410    fn test_json_get_bool() {
411        let json_get_bool = JsonGetBool::default();
412
413        assert_eq!("json_get_bool", json_get_bool.name());
414        assert_eq!(
415            DataType::Boolean,
416            json_get_bool
417                .return_type(&[DataType::Binary, DataType::Utf8])
418                .unwrap()
419        );
420
421        let json_strings = [
422            r#"{"a": {"b": true}, "b": false, "c": true}"#,
423            r#"{"a": false, "b": {"c": true}, "c": false}"#,
424            r#"{"a": true, "b": false, "c": {"a": true}}"#,
425        ];
426        let paths = vec!["$.a.b", "$.a", "$.c"];
427        let results = [Some(true), Some(false), None];
428
429        let jsonbs = json_strings
430            .iter()
431            .map(|s| {
432                let value = jsonb::parse_value(s.as_bytes()).unwrap();
433                value.to_vec()
434            })
435            .collect::<Vec<_>>();
436
437        let args = ScalarFunctionArgs {
438            args: vec![
439                ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
440                ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
441            ],
442            arg_fields: vec![],
443            number_rows: 3,
444            return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
445            config_options: Arc::new(Default::default()),
446        };
447        let result = json_get_bool
448            .invoke_with_args(args)
449            .and_then(|x| x.to_array(3))
450            .unwrap();
451        let vector = result.as_boolean();
452
453        assert_eq!(3, vector.len());
454        for (i, gt) in results.iter().enumerate() {
455            let result = vector.is_valid(i).then(|| vector.value(i));
456            assert_eq!(*gt, result);
457        }
458    }
459
460    #[test]
461    fn test_json_get_string() {
462        let json_get_string = JsonGetString::default();
463
464        assert_eq!("json_get_string", json_get_string.name());
465        assert_eq!(
466            DataType::Utf8View,
467            json_get_string
468                .return_type(&[DataType::Binary, DataType::Utf8])
469                .unwrap()
470        );
471
472        let json_strings = [
473            r#"{"a": {"b": "a"}, "b": "b", "c": "c"}"#,
474            r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
475            r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#,
476        ];
477        let paths = vec!["$.a.b", "$.a", ""];
478        let results = [Some("a"), Some("d"), None];
479
480        let jsonbs = json_strings
481            .iter()
482            .map(|s| {
483                let value = jsonb::parse_value(s.as_bytes()).unwrap();
484                value.to_vec()
485            })
486            .collect::<Vec<_>>();
487
488        let args = ScalarFunctionArgs {
489            args: vec![
490                ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
491                ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
492            ],
493            arg_fields: vec![],
494            number_rows: 3,
495            return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
496            config_options: Arc::new(Default::default()),
497        };
498        let result = json_get_string
499            .invoke_with_args(args)
500            .and_then(|x| x.to_array(3))
501            .unwrap();
502        let vector = result.as_string_view();
503
504        assert_eq!(3, vector.len());
505        for (i, gt) in results.iter().enumerate() {
506            let result = vector.is_valid(i).then(|| vector.value(i));
507            assert_eq!(*gt, result);
508        }
509    }
510
511    #[test]
512    fn test_json_get_object() -> datafusion_common::Result<()> {
513        let udf = JsonGetObject::default();
514        assert_eq!("json_get_object", udf.name());
515        assert_eq!(
516            DataType::BinaryView,
517            udf.return_type(&[DataType::BinaryView, DataType::Utf8View])?
518        );
519
520        let json_value = parse_string_to_jsonb(r#"{"a": {"b": {"c": {"d": 1}}}}"#).unwrap();
521        let paths = vec!["$", "$.a", "$.a.b", "$.a.b.c", "$.a.b.c.d", "$.e", "$.a.e"];
522        let number_rows = paths.len();
523
524        let args = ScalarFunctionArgs {
525            args: vec![
526                ColumnarValue::Scalar(ScalarValue::Binary(Some(json_value))),
527                ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
528            ],
529            arg_fields: vec![],
530            number_rows,
531            return_field: Arc::new(Field::new("x", DataType::Binary, false)),
532            config_options: Arc::new(Default::default()),
533        };
534        let result = udf
535            .invoke_with_args(args)
536            .and_then(|x| x.to_array(number_rows))?;
537        let result = result.as_binary_view();
538
539        let expected = &BinaryViewArray::from_iter(
540            vec![
541                Some(r#"{"a": {"b": {"c": {"d": 1}}}}"#),
542                Some(r#"{"b": {"c": {"d": 1}}}"#),
543                Some(r#"{"c": {"d": 1}}"#),
544                Some(r#"{"d": 1}"#),
545                None,
546                None,
547                None,
548            ]
549            .into_iter()
550            .map(|x| x.and_then(|s| parse_string_to_jsonb(s).ok())),
551        );
552        assert_eq!(result, expected);
553        Ok(())
554    }
555}