common_function/scalars/json/
json_get.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{self, Display};
16use std::sync::Arc;
17
18use arrow::compute;
19use datafusion_common::arrow::array::{
20    Array, AsArray, BooleanBuilder, Float64Builder, Int64Builder, StringViewBuilder,
21};
22use datafusion_common::arrow::datatypes::DataType;
23use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature};
24
25use crate::function::{Function, extract_args};
26use crate::helper;
27
28fn get_json_by_path(json: &[u8], path: &str) -> Option<Vec<u8>> {
29    let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes());
30    match json_path {
31        Ok(json_path) => {
32            let mut sub_jsonb = Vec::new();
33            let mut sub_offsets = Vec::new();
34            match jsonb::get_by_path(json, json_path, &mut sub_jsonb, &mut sub_offsets) {
35                Ok(_) => Some(sub_jsonb),
36                Err(_) => None,
37            }
38        }
39        _ => None,
40    }
41}
42
43/// Get the value from the JSONB by the given path and return it as specified type.
44/// If the path does not exist or the value is not the type specified, return `NULL`.
45macro_rules! json_get {
46    // e.g. name = JsonGetInt, type = Int64, rust_type = i64, doc = "Get the value from the JSONB by the given path and return it as an integer."
47    ($name:ident, $type:ident, $rust_type:ident, $doc:expr) => {
48        paste::paste! {
49            #[doc = $doc]
50            #[derive(Clone, Debug)]
51            pub struct $name {
52                signature: Signature,
53            }
54
55            impl $name {
56                pub const NAME: &'static str = stringify!([<$name:snake>]);
57            }
58
59            impl Default for $name {
60                fn default() -> Self {
61                    Self {
62                        // TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type.
63                        signature: helper::one_of_sigs2(
64                            vec![DataType::Binary, DataType::BinaryView],
65                            vec![DataType::Utf8, DataType::Utf8View],
66                        ),
67                    }
68                }
69            }
70
71            impl Function for $name {
72                fn name(&self) -> &str {
73                    Self::NAME
74                }
75
76                fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
77                    Ok(DataType::[<$type>])
78                }
79
80                fn signature(&self) -> &Signature {
81                    &self.signature
82                }
83
84                fn invoke_with_args(
85                    &self,
86                    args: ScalarFunctionArgs,
87                ) -> datafusion_common::Result<ColumnarValue> {
88                    let [arg0, arg1] = extract_args(self.name(), &args)?;
89                    let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
90                    let jsons = arg0.as_binary_view();
91                    let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
92                    let paths = arg1.as_string_view();
93
94                    let size = jsons.len();
95                    let mut builder = [<$type Builder>]::with_capacity(size);
96
97                    for i in 0..size {
98                        let json = jsons.is_valid(i).then(|| jsons.value(i));
99                        let path = paths.is_valid(i).then(|| paths.value(i));
100                        let result = match (json, path) {
101                            (Some(json), Some(path)) => {
102                                get_json_by_path(json, path)
103                                    .and_then(|json| { jsonb::[<to_ $rust_type>](&json).ok() })
104                            }
105                            _ => None,
106                        };
107
108                        builder.append_option(result);
109                    }
110
111                    Ok(ColumnarValue::Array(Arc::new(builder.finish())))
112                }
113            }
114
115            impl Display for $name {
116                fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
117                    write!(f, "{}", Self::NAME.to_ascii_uppercase())
118                }
119            }
120        }
121    };
122}
123
124json_get!(
125    JsonGetInt,
126    Int64,
127    i64,
128    "Get the value from the JSONB by the given path and return it as an integer."
129);
130
131json_get!(
132    JsonGetFloat,
133    Float64,
134    f64,
135    "Get the value from the JSONB by the given path and return it as a float."
136);
137
138json_get!(
139    JsonGetBool,
140    Boolean,
141    bool,
142    "Get the value from the JSONB by the given path and return it as a boolean."
143);
144
145/// Get the value from the JSONB by the given path and return it as a string.
146#[derive(Clone, Debug)]
147pub struct JsonGetString {
148    signature: Signature,
149}
150
151impl JsonGetString {
152    pub const NAME: &'static str = "json_get_string";
153}
154
155impl Default for JsonGetString {
156    fn default() -> Self {
157        Self {
158            // TODO(LFC): Use a more clear type here instead of "Binary" for Json input, once we have a "Json" type.
159            signature: helper::one_of_sigs2(
160                vec![DataType::Binary, DataType::BinaryView],
161                vec![DataType::Utf8, DataType::Utf8View],
162            ),
163        }
164    }
165}
166
167impl Function for JsonGetString {
168    fn name(&self) -> &str {
169        Self::NAME
170    }
171
172    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
173        Ok(DataType::Utf8View)
174    }
175
176    fn signature(&self) -> &Signature {
177        &self.signature
178    }
179
180    fn invoke_with_args(
181        &self,
182        args: ScalarFunctionArgs,
183    ) -> datafusion_common::Result<ColumnarValue> {
184        let [arg0, arg1] = extract_args(self.name(), &args)?;
185        let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
186        let jsons = arg0.as_binary_view();
187        let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
188        let paths = arg1.as_string_view();
189
190        let size = jsons.len();
191        let mut builder = StringViewBuilder::with_capacity(size);
192
193        for i in 0..size {
194            let json = jsons.is_valid(i).then(|| jsons.value(i));
195            let path = paths.is_valid(i).then(|| paths.value(i));
196            let result = match (json, path) {
197                (Some(json), Some(path)) => {
198                    get_json_by_path(json, path).and_then(|json| jsonb::to_str(&json).ok())
199                }
200                _ => None,
201            };
202            builder.append_option(result);
203        }
204
205        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
206    }
207}
208
209impl Display for JsonGetString {
210    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
211        write!(f, "{}", Self::NAME.to_ascii_uppercase())
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use std::sync::Arc;
218
219    use arrow_schema::Field;
220    use datafusion_common::arrow::array::{BinaryArray, StringArray};
221    use datafusion_common::arrow::datatypes::{Float64Type, Int64Type};
222
223    use super::*;
224
225    #[test]
226    fn test_json_get_int() {
227        let json_get_int = JsonGetInt::default();
228
229        assert_eq!("json_get_int", json_get_int.name());
230        assert_eq!(
231            DataType::Int64,
232            json_get_int
233                .return_type(&[DataType::Binary, DataType::Utf8])
234                .unwrap()
235        );
236
237        let json_strings = [
238            r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
239            r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
240            r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
241        ];
242        let paths = vec!["$.a.b", "$.a", "$.c"];
243        let results = [Some(2), Some(4), None];
244
245        let jsonbs = json_strings
246            .iter()
247            .map(|s| {
248                let value = jsonb::parse_value(s.as_bytes()).unwrap();
249                value.to_vec()
250            })
251            .collect::<Vec<_>>();
252
253        let args = ScalarFunctionArgs {
254            args: vec![
255                ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
256                ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
257            ],
258            arg_fields: vec![],
259            number_rows: 3,
260            return_field: Arc::new(Field::new("x", DataType::Int64, false)),
261            config_options: Arc::new(Default::default()),
262        };
263        let result = json_get_int
264            .invoke_with_args(args)
265            .and_then(|x| x.to_array(3))
266            .unwrap();
267        let vector = result.as_primitive::<Int64Type>();
268
269        assert_eq!(3, vector.len());
270        for (i, gt) in results.iter().enumerate() {
271            let result = vector.is_valid(i).then(|| vector.value(i));
272            assert_eq!(*gt, result);
273        }
274    }
275
276    #[test]
277    fn test_json_get_float() {
278        let json_get_float = JsonGetFloat::default();
279
280        assert_eq!("json_get_float", json_get_float.name());
281        assert_eq!(
282            DataType::Float64,
283            json_get_float
284                .return_type(&[DataType::Binary, DataType::Utf8])
285                .unwrap()
286        );
287
288        let json_strings = [
289            r#"{"a": {"b": 2.1}, "b": 2.2, "c": 3.3}"#,
290            r#"{"a": 4.4, "b": {"c": 6.6}, "c": 6.6}"#,
291            r#"{"a": 7.7, "b": 8.8, "c": {"a": 7.7}}"#,
292        ];
293        let paths = vec!["$.a.b", "$.a", "$.c"];
294        let results = [Some(2.1), Some(4.4), None];
295
296        let jsonbs = json_strings
297            .iter()
298            .map(|s| {
299                let value = jsonb::parse_value(s.as_bytes()).unwrap();
300                value.to_vec()
301            })
302            .collect::<Vec<_>>();
303
304        let args = ScalarFunctionArgs {
305            args: vec![
306                ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
307                ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
308            ],
309            arg_fields: vec![],
310            number_rows: 3,
311            return_field: Arc::new(Field::new("x", DataType::Float64, false)),
312            config_options: Arc::new(Default::default()),
313        };
314        let result = json_get_float
315            .invoke_with_args(args)
316            .and_then(|x| x.to_array(3))
317            .unwrap();
318        let vector = result.as_primitive::<Float64Type>();
319
320        assert_eq!(3, vector.len());
321        for (i, gt) in results.iter().enumerate() {
322            let result = vector.is_valid(i).then(|| vector.value(i));
323            assert_eq!(*gt, result);
324        }
325    }
326
327    #[test]
328    fn test_json_get_bool() {
329        let json_get_bool = JsonGetBool::default();
330
331        assert_eq!("json_get_bool", json_get_bool.name());
332        assert_eq!(
333            DataType::Boolean,
334            json_get_bool
335                .return_type(&[DataType::Binary, DataType::Utf8])
336                .unwrap()
337        );
338
339        let json_strings = [
340            r#"{"a": {"b": true}, "b": false, "c": true}"#,
341            r#"{"a": false, "b": {"c": true}, "c": false}"#,
342            r#"{"a": true, "b": false, "c": {"a": true}}"#,
343        ];
344        let paths = vec!["$.a.b", "$.a", "$.c"];
345        let results = [Some(true), Some(false), None];
346
347        let jsonbs = json_strings
348            .iter()
349            .map(|s| {
350                let value = jsonb::parse_value(s.as_bytes()).unwrap();
351                value.to_vec()
352            })
353            .collect::<Vec<_>>();
354
355        let args = ScalarFunctionArgs {
356            args: vec![
357                ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
358                ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
359            ],
360            arg_fields: vec![],
361            number_rows: 3,
362            return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
363            config_options: Arc::new(Default::default()),
364        };
365        let result = json_get_bool
366            .invoke_with_args(args)
367            .and_then(|x| x.to_array(3))
368            .unwrap();
369        let vector = result.as_boolean();
370
371        assert_eq!(3, vector.len());
372        for (i, gt) in results.iter().enumerate() {
373            let result = vector.is_valid(i).then(|| vector.value(i));
374            assert_eq!(*gt, result);
375        }
376    }
377
378    #[test]
379    fn test_json_get_string() {
380        let json_get_string = JsonGetString::default();
381
382        assert_eq!("json_get_string", json_get_string.name());
383        assert_eq!(
384            DataType::Utf8View,
385            json_get_string
386                .return_type(&[DataType::Binary, DataType::Utf8])
387                .unwrap()
388        );
389
390        let json_strings = [
391            r#"{"a": {"b": "a"}, "b": "b", "c": "c"}"#,
392            r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
393            r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#,
394        ];
395        let paths = vec!["$.a.b", "$.a", ""];
396        let results = [Some("a"), Some("d"), None];
397
398        let jsonbs = json_strings
399            .iter()
400            .map(|s| {
401                let value = jsonb::parse_value(s.as_bytes()).unwrap();
402                value.to_vec()
403            })
404            .collect::<Vec<_>>();
405
406        let args = ScalarFunctionArgs {
407            args: vec![
408                ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
409                ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
410            ],
411            arg_fields: vec![],
412            number_rows: 3,
413            return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
414            config_options: Arc::new(Default::default()),
415        };
416        let result = json_get_string
417            .invoke_with_args(args)
418            .and_then(|x| x.to_array(3))
419            .unwrap();
420        let vector = result.as_string_view();
421
422        assert_eq!(3, vector.len());
423        for (i, gt) in results.iter().enumerate() {
424            let result = vector.is_valid(i).then(|| vector.value(i));
425            assert_eq!(*gt, result);
426        }
427    }
428}