common_function/scalars/json/
json_get.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::FromStr;
16use std::sync::Arc;
17
18use arrow::array::{ArrayRef, BinaryViewArray, StringViewArray, StructArray};
19use arrow::compute;
20use arrow::datatypes::{Float64Type, Int64Type, UInt64Type};
21use arrow_schema::Field;
22use datafusion_common::arrow::array::{
23    Array, AsArray, BinaryViewBuilder, BooleanBuilder, Float64Builder, Int64Builder,
24    StringViewBuilder,
25};
26use datafusion_common::arrow::datatypes::DataType;
27use datafusion_common::{DataFusionError, Result};
28use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
29use datatypes::arrow_array::{int_array_value_at_index, string_array_value_at_index};
30use datatypes::json::JsonStructureSettings;
31use derive_more::Display;
32use jsonpath_rust::JsonPath;
33use serde_json::Value;
34
35use crate::function::{Function, extract_args};
36use crate::helper;
37
38fn get_json_by_path(json: &[u8], path: &str) -> Option<Vec<u8>> {
39    let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes());
40    match json_path {
41        Ok(json_path) => {
42            let mut sub_jsonb = Vec::new();
43            let mut sub_offsets = Vec::new();
44            match jsonb::get_by_path(json, json_path, &mut sub_jsonb, &mut sub_offsets) {
45                Ok(_) => Some(sub_jsonb),
46                Err(_) => None,
47            }
48        }
49        _ => None,
50    }
51}
52
53enum JsonResultValue<'a> {
54    Jsonb(Vec<u8>),
55    JsonStructByColumn(&'a ArrayRef, usize),
56    JsonStructByValue(&'a Value),
57}
58
59trait JsonGetResultBuilder {
60    fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()>;
61
62    fn append_null(&mut self);
63
64    fn build(&mut self) -> ArrayRef;
65}
66
67/// Common implementation for JSON get scalar functions.
68///
69/// `JsonGet` encapsulates the logic for extracting values from JSON inputs
70/// based on a path expression. Different JSON get functions reuse this
71/// implementation by supplying their own `JsonGetResultBuilder` to control
72/// how the resulting values are materialized into an Arrow array.
73#[derive(Debug)]
74struct JsonGet {
75    signature: Signature,
76}
77
78impl JsonGet {
79    fn invoke<F, B>(&self, args: ScalarFunctionArgs, builder_factory: F) -> Result<ColumnarValue>
80    where
81        F: Fn(usize) -> B,
82        B: JsonGetResultBuilder,
83    {
84        let [arg0, arg1] = extract_args("JSON_GET", &args)?;
85
86        let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
87        let paths = arg1.as_string_view();
88
89        let mut builder = (builder_factory)(arg0.len());
90        match arg0.data_type() {
91            DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
92                let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
93                let jsons = arg0.as_binary_view();
94                jsonb_get(jsons, paths, &mut builder)?;
95            }
96            DataType::Struct(_) => {
97                let jsons = arg0.as_struct();
98                json_struct_get(jsons, paths, &mut builder)?
99            }
100            _ => {
101                return Err(DataFusionError::Execution(format!(
102                    "JSON_GET not supported argument type {}",
103                    arg0.data_type(),
104                )));
105            }
106        };
107
108        Ok(ColumnarValue::Array(builder.build()))
109    }
110}
111
112impl Default for JsonGet {
113    fn default() -> Self {
114        Self {
115            signature: Signature::any(2, Volatility::Immutable),
116        }
117    }
118}
119
120// TODO: refactor this to StringLikeArrayBuilder from Arrow 57
121struct StringResultBuilder(StringViewBuilder);
122
123impl JsonGetResultBuilder for StringResultBuilder {
124    fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
125        match value {
126            JsonResultValue::Jsonb(value) => self.0.append_option(jsonb::to_str(&value).ok()),
127            JsonResultValue::JsonStructByColumn(column, i) => {
128                if let Some(v) = string_array_value_at_index(column, i) {
129                    self.0.append_value(v);
130                } else {
131                    self.0
132                        .append_value(arrow_cast::display::array_value_to_string(column, i)?);
133                }
134            }
135            JsonResultValue::JsonStructByValue(value) => {
136                if let Some(s) = value.as_str() {
137                    self.0.append_value(s)
138                } else {
139                    self.0.append_value(value.to_string())
140                }
141            }
142        }
143        Ok(())
144    }
145
146    fn append_null(&mut self) {
147        self.0.append_null();
148    }
149
150    fn build(&mut self) -> ArrayRef {
151        Arc::new(self.0.finish())
152    }
153}
154
155#[derive(Default, Display, Debug)]
156#[display("{}", Self::NAME.to_ascii_uppercase())]
157pub struct JsonGetString(JsonGet);
158
159impl JsonGetString {
160    pub const NAME: &'static str = "json_get_string";
161}
162
163impl Function for JsonGetString {
164    fn name(&self) -> &str {
165        Self::NAME
166    }
167
168    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
169        Ok(DataType::Utf8View)
170    }
171
172    fn signature(&self) -> &Signature {
173        &self.0.signature
174    }
175
176    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
177        self.0.invoke(args, |len: usize| {
178            StringResultBuilder(StringViewBuilder::with_capacity(len))
179        })
180    }
181}
182
183struct IntResultBuilder(Int64Builder);
184
185impl JsonGetResultBuilder for IntResultBuilder {
186    fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
187        match value {
188            JsonResultValue::Jsonb(value) => self.0.append_option(jsonb::to_i64(&value).ok()),
189            JsonResultValue::JsonStructByColumn(column, i) => {
190                self.0.append_option(int_array_value_at_index(column, i))
191            }
192            JsonResultValue::JsonStructByValue(value) => self.0.append_option(value.as_i64()),
193        }
194        Ok(())
195    }
196
197    fn append_null(&mut self) {
198        self.0.append_null();
199    }
200
201    fn build(&mut self) -> ArrayRef {
202        Arc::new(self.0.finish())
203    }
204}
205
206#[derive(Default, Display, Debug)]
207#[display("{}", Self::NAME.to_ascii_uppercase())]
208pub struct JsonGetInt(JsonGet);
209
210impl JsonGetInt {
211    pub const NAME: &'static str = "json_get_int";
212}
213
214impl Function for JsonGetInt {
215    fn name(&self) -> &str {
216        Self::NAME
217    }
218
219    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
220        Ok(DataType::Int64)
221    }
222
223    fn signature(&self) -> &Signature {
224        &self.0.signature
225    }
226
227    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
228        self.0.invoke(args, |len: usize| {
229            IntResultBuilder(Int64Builder::with_capacity(len))
230        })
231    }
232}
233
234struct FloatResultBuilder(Float64Builder);
235
236impl JsonGetResultBuilder for FloatResultBuilder {
237    fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
238        match value {
239            JsonResultValue::Jsonb(value) => self.0.append_option(jsonb::to_f64(&value).ok()),
240            JsonResultValue::JsonStructByColumn(column, i) => {
241                let result = if column.data_type() == &DataType::Float64 {
242                    column
243                        .as_primitive::<Float64Type>()
244                        .is_valid(i)
245                        .then(|| column.as_primitive::<Float64Type>().value(i))
246                } else {
247                    None
248                };
249                self.0.append_option(result);
250            }
251            JsonResultValue::JsonStructByValue(value) => self.0.append_option(value.as_f64()),
252        }
253        Ok(())
254    }
255
256    fn append_null(&mut self) {
257        self.0.append_null();
258    }
259
260    fn build(&mut self) -> ArrayRef {
261        Arc::new(self.0.finish())
262    }
263}
264
265#[derive(Default, Display, Debug)]
266#[display("{}", Self::NAME.to_ascii_uppercase())]
267pub struct JsonGetFloat(JsonGet);
268
269impl JsonGetFloat {
270    pub const NAME: &'static str = "json_get_float";
271}
272
273impl Function for JsonGetFloat {
274    fn name(&self) -> &str {
275        Self::NAME
276    }
277
278    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
279        Ok(DataType::Float64)
280    }
281
282    fn signature(&self) -> &Signature {
283        &self.0.signature
284    }
285
286    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
287        self.0.invoke(args, |len: usize| {
288            FloatResultBuilder(Float64Builder::with_capacity(len))
289        })
290    }
291}
292
293struct BoolResultBuilder(BooleanBuilder);
294
295impl JsonGetResultBuilder for BoolResultBuilder {
296    fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
297        match value {
298            JsonResultValue::Jsonb(value) => self.0.append_option(jsonb::to_bool(&value).ok()),
299            JsonResultValue::JsonStructByColumn(column, i) => {
300                let result = if column.data_type() == &DataType::Boolean {
301                    column
302                        .as_boolean()
303                        .is_valid(i)
304                        .then(|| column.as_boolean().value(i))
305                } else {
306                    None
307                };
308                self.0.append_option(result);
309            }
310            JsonResultValue::JsonStructByValue(value) => self.0.append_option(value.as_bool()),
311        }
312        Ok(())
313    }
314
315    fn append_null(&mut self) {
316        self.0.append_null();
317    }
318
319    fn build(&mut self) -> ArrayRef {
320        Arc::new(self.0.finish())
321    }
322}
323
324#[derive(Default, Display, Debug)]
325#[display("{}", Self::NAME.to_ascii_uppercase())]
326pub struct JsonGetBool(JsonGet);
327
328impl JsonGetBool {
329    pub const NAME: &'static str = "json_get_bool";
330}
331
332impl Function for JsonGetBool {
333    fn name(&self) -> &str {
334        Self::NAME
335    }
336
337    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
338        Ok(DataType::Boolean)
339    }
340
341    fn signature(&self) -> &Signature {
342        &self.0.signature
343    }
344
345    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
346        self.0.invoke(args, |len: usize| {
347            BoolResultBuilder(BooleanBuilder::with_capacity(len))
348        })
349    }
350}
351
352fn jsonb_get(
353    jsons: &BinaryViewArray,
354    paths: &StringViewArray,
355    builder: &mut dyn JsonGetResultBuilder,
356) -> Result<()> {
357    let size = jsons.len();
358    for i in 0..size {
359        let json = jsons.is_valid(i).then(|| jsons.value(i));
360        let path = paths.is_valid(i).then(|| paths.value(i));
361        let result = match (json, path) {
362            (Some(json), Some(path)) => get_json_by_path(json, path),
363            _ => None,
364        };
365        if let Some(v) = result {
366            builder.append_value(JsonResultValue::Jsonb(v))?;
367        } else {
368            builder.append_null();
369        }
370    }
371    Ok(())
372}
373
374fn json_struct_get(
375    jsons: &StructArray,
376    paths: &StringViewArray,
377    builder: &mut dyn JsonGetResultBuilder,
378) -> Result<()> {
379    let size = jsons.len();
380    for i in 0..size {
381        if jsons.is_null(i) || paths.is_null(i) {
382            builder.append_null();
383            continue;
384        }
385        let path = paths.value(i);
386
387        // naively assume the JSON path is our kind of indexing to the field, by removing its "root"
388        let field_path = path.trim().replace("$.", "");
389        let column = jsons.column_by_name(&field_path);
390
391        if let Some(column) = column {
392            builder.append_value(JsonResultValue::JsonStructByColumn(column, i))?;
393        } else {
394            let Some(raw) = jsons
395                .column_by_name(JsonStructureSettings::RAW_FIELD)
396                .and_then(|x| string_array_value_at_index(x, i))
397            else {
398                builder.append_null();
399                continue;
400            };
401
402            let path: JsonPath<Value> = JsonPath::try_from(path).map_err(|e| {
403                DataFusionError::Execution(format!("{path} is not a valid JSON path: {e}"))
404            })?;
405            // the wanted field is not retrievable from the JSON struct columns directly, we have
406            // to combine everything (columns and the "_raw") into a complete JSON value to find it
407            let value = json_struct_to_value(raw, jsons, i)?;
408
409            match path.find(&value) {
410                Value::Null => builder.append_null(),
411                Value::Array(values) => match values.as_slice() {
412                    [] => builder.append_null(),
413                    [x] => builder.append_value(JsonResultValue::JsonStructByValue(x))?,
414                    _ => builder.append_value(JsonResultValue::JsonStructByValue(&value))?,
415                },
416                value => builder.append_value(JsonResultValue::JsonStructByValue(&value))?,
417            }
418        }
419    }
420
421    Ok(())
422}
423
424fn json_struct_to_value(raw: &str, jsons: &StructArray, i: usize) -> Result<Value> {
425    let Ok(mut json) = Value::from_str(raw) else {
426        return Err(DataFusionError::Internal(format!(
427            "inner field '{}' is not a valid JSON string",
428            JsonStructureSettings::RAW_FIELD
429        )));
430    };
431
432    for (column_name, column) in jsons.column_names().into_iter().zip(jsons.columns()) {
433        if column_name == JsonStructureSettings::RAW_FIELD {
434            continue;
435        }
436
437        let (json_pointer, field) = if let Some((json_object, field)) = column_name.rsplit_once(".")
438        {
439            let json_pointer = format!("/{}", json_object.replace(".", "/"));
440            (json_pointer, field)
441        } else {
442            ("".to_string(), column_name)
443        };
444        let Some(json_object) = json
445            .pointer_mut(&json_pointer)
446            .and_then(|x| x.as_object_mut())
447        else {
448            return Err(DataFusionError::Internal(format!(
449                "value at JSON pointer '{}' is not an object",
450                json_pointer
451            )));
452        };
453
454        macro_rules! insert {
455            ($column: ident, $i: ident, $json_object: ident, $field: ident) => {{
456                if let Some(value) = $column
457                    .is_valid($i)
458                    .then(|| serde_json::Value::from($column.value($i)))
459                {
460                    $json_object.insert($field.to_string(), value);
461                }
462            }};
463        }
464
465        match column.data_type() {
466            // boolean => Value::Bool
467            DataType::Boolean => {
468                let column = column.as_boolean();
469                insert!(column, i, json_object, field);
470            }
471            // int => Value::Number
472            DataType::Int64 => {
473                let column = column.as_primitive::<Int64Type>();
474                insert!(column, i, json_object, field);
475            }
476            DataType::UInt64 => {
477                let column = column.as_primitive::<UInt64Type>();
478                insert!(column, i, json_object, field);
479            }
480            DataType::Float64 => {
481                let column = column.as_primitive::<Float64Type>();
482                insert!(column, i, json_object, field);
483            }
484            // string => Value::String
485            DataType::Utf8 => {
486                let column = column.as_string::<i32>();
487                insert!(column, i, json_object, field);
488            }
489            DataType::LargeUtf8 => {
490                let column = column.as_string::<i64>();
491                insert!(column, i, json_object, field);
492            }
493            DataType::Utf8View => {
494                let column = column.as_string_view();
495                insert!(column, i, json_object, field);
496            }
497            // other => Value::Array and Value::Object
498            _ => {
499                return Err(DataFusionError::NotImplemented(format!(
500                    "{} is not yet supported to be executed with field {} of datatype {}",
501                    JsonGetString::NAME,
502                    column_name,
503                    column.data_type()
504                )));
505            }
506        }
507    }
508    Ok(json)
509}
510
511/// This function is mostly called as `json_get(value, 'attr')::type` and rewritten by
512/// `json_get_rewriter::JsonGetRewriter` to `json_get(value, 'attr', NULL::type)`. So we
513/// use the third argument's type to determine the return type.
514#[derive(Debug, Display)]
515#[display("{}", Self::NAME.to_ascii_uppercase())]
516pub(super) struct JsonGetWithType {
517    signature: Signature,
518}
519
520impl JsonGetWithType {
521    pub(crate) const NAME: &'static str = "json_get";
522}
523
524impl Default for JsonGetWithType {
525    fn default() -> Self {
526        Self {
527            signature: Signature::variadic_any(Volatility::Immutable),
528        }
529    }
530}
531
532impl Function for JsonGetWithType {
533    fn name(&self) -> &str {
534        Self::NAME
535    }
536
537    fn return_type(&self, _input_types: &[DataType]) -> datafusion_common::Result<DataType> {
538        Err(DataFusionError::Internal(
539            "This method isn't meant to be called".to_string(),
540        ))
541    }
542
543    fn return_field_from_args(
544        &self,
545        args: datafusion_expr::ReturnFieldArgs<'_>,
546    ) -> datafusion_common::Result<Arc<Field>> {
547        match args.scalar_arguments.get(2) {
548            Some(Some(v)) => {
549                let mut data_type = v.data_type();
550                if matches!(data_type, DataType::Utf8 | DataType::LargeUtf8) {
551                    data_type = DataType::Utf8View;
552                }
553
554                Ok(Arc::new(Field::new(self.name(), data_type, true)))
555            }
556            _ => Ok(Arc::new(Field::new(self.name(), DataType::Utf8View, true))),
557        }
558    }
559
560    fn signature(&self) -> &Signature {
561        &self.signature
562    }
563
564    fn invoke_with_args(
565        &self,
566        args: ScalarFunctionArgs,
567    ) -> datafusion_common::Result<ColumnarValue> {
568        let [arg0, arg1, _] = extract_args(self.name(), &args)?;
569        let len = arg0.len();
570
571        let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
572        let paths = arg1.as_string_view();
573
574        // mapping datatypes returned from return_field_from_args
575        let mut builder: Box<dyn JsonGetResultBuilder> = match args.return_field.data_type() {
576            DataType::Utf8View => {
577                Box::new(StringResultBuilder(StringViewBuilder::with_capacity(len)))
578            }
579            DataType::Int64 => Box::new(IntResultBuilder(Int64Builder::with_capacity(len))),
580            DataType::Float64 => Box::new(FloatResultBuilder(Float64Builder::with_capacity(len))),
581            DataType::Boolean => Box::new(BoolResultBuilder(BooleanBuilder::with_capacity(len))),
582            _type => {
583                return Err(DataFusionError::Internal(format!(
584                    "Unsupported return type {}",
585                    _type
586                )));
587            }
588        };
589
590        match arg0.data_type() {
591            DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
592                let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
593                let jsons = arg0.as_binary_view();
594                jsonb_get(jsons, paths, builder.as_mut())?;
595            }
596            DataType::Struct(_) => {
597                let jsons = arg0.as_struct();
598                json_struct_get(jsons, paths, builder.as_mut())?;
599            }
600            _ => {
601                return Err(DataFusionError::Execution(format!(
602                    "JSON_GET not supported argument type {}",
603                    arg0.data_type(),
604                )));
605            }
606        };
607
608        Ok(ColumnarValue::Array(builder.build()))
609    }
610}
611
612/// Get the object from JSON value by path.
613#[derive(Display, Debug)]
614#[display("{}", Self::NAME.to_ascii_uppercase())]
615pub(super) struct JsonGetObject {
616    signature: Signature,
617}
618
619impl JsonGetObject {
620    const NAME: &'static str = "json_get_object";
621}
622
623impl Default for JsonGetObject {
624    fn default() -> Self {
625        Self {
626            signature: helper::one_of_sigs2(
627                vec![
628                    DataType::Binary,
629                    DataType::LargeBinary,
630                    DataType::BinaryView,
631                ],
632                vec![DataType::UInt8, DataType::LargeUtf8, DataType::Utf8View],
633            ),
634        }
635    }
636}
637
638impl Function for JsonGetObject {
639    fn name(&self) -> &str {
640        Self::NAME
641    }
642
643    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
644        Ok(DataType::BinaryView)
645    }
646
647    fn signature(&self) -> &Signature {
648        &self.signature
649    }
650
651    fn invoke_with_args(
652        &self,
653        args: ScalarFunctionArgs,
654    ) -> datafusion_common::Result<ColumnarValue> {
655        let [arg0, arg1] = extract_args(self.name(), &args)?;
656        let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
657        let jsons = arg0.as_binary_view();
658        let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
659        let paths = arg1.as_string_view();
660
661        let len = jsons.len();
662        let mut builder = BinaryViewBuilder::with_capacity(len);
663
664        for i in 0..len {
665            let json = jsons.is_valid(i).then(|| jsons.value(i));
666            let path = paths.is_valid(i).then(|| paths.value(i));
667            let result = if let (Some(json), Some(path)) = (json, path) {
668                let result = jsonb::jsonpath::parse_json_path(path.as_bytes()).and_then(|path| {
669                    let mut data = Vec::new();
670                    let mut offset = Vec::new();
671                    jsonb::get_by_path(json, path, &mut data, &mut offset)
672                        .map(|()| jsonb::is_object(&data).then_some(data))
673                });
674                result.map_err(|e| DataFusionError::Execution(e.to_string()))?
675            } else {
676                None
677            };
678            builder.append_option(result);
679        }
680
681        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
682    }
683}
684
685#[cfg(test)]
686mod tests {
687    use std::sync::Arc;
688
689    use arrow::array::{Float64Array, Int64Array, StructArray};
690    use arrow_schema::Field;
691    use datafusion_common::ScalarValue;
692    use datafusion_common::arrow::array::{BinaryArray, BinaryViewArray, StringArray};
693    use datafusion_common::arrow::datatypes::{Float64Type, Int64Type};
694    use datatypes::types::parse_string_to_jsonb;
695    use serde_json::json;
696
697    use super::*;
698
699    /// Create a JSON object like this (as a one element struct array for testing):
700    ///
701    /// ```JSON
702    /// {
703    ///     "kind": "foo",
704    ///     "payload": {
705    ///         "code": 404,
706    ///         "success": false,
707    ///         "result": {
708    ///             "error": "not found",
709    ///             "time_cost": 1.234
710    ///         }
711    ///     }
712    /// }
713    /// ```
714    fn test_json_struct() -> ArrayRef {
715        Arc::new(StructArray::new(
716            vec![
717                Field::new("kind", DataType::Utf8, true),
718                Field::new("payload.code", DataType::Int64, true),
719                Field::new("payload.result.time_cost", DataType::Float64, true),
720                Field::new(JsonStructureSettings::RAW_FIELD, DataType::Utf8View, true),
721            ]
722            .into(),
723            vec![
724                Arc::new(StringArray::from_iter([Some("foo")])) as ArrayRef,
725                Arc::new(Int64Array::from_iter([Some(404)])),
726                Arc::new(Float64Array::from_iter([Some(1.234)])),
727                Arc::new(StringViewArray::from_iter([Some(
728                    json! ({
729                        "payload": {
730                            "success": false,
731                            "result": {
732                                "error": "not found"
733                            }
734                        }
735                    })
736                    .to_string(),
737                )])),
738            ],
739            None,
740        ))
741    }
742
743    #[test]
744    fn test_json_get_int() {
745        let json_get_int = JsonGetInt::default();
746
747        assert_eq!("json_get_int", json_get_int.name());
748        assert_eq!(
749            DataType::Int64,
750            json_get_int
751                .return_type(&[DataType::Binary, DataType::Utf8])
752                .unwrap()
753        );
754
755        let json_strings = [
756            r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
757            r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
758            r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
759        ];
760        let json_struct = test_json_struct();
761
762        let path_expects = vec![
763            ("$.a.b", Some(2)),
764            ("$.a", Some(4)),
765            ("$.c", None),
766            ("$.kind", None),
767            ("$.payload.code", Some(404)),
768            ("$.payload.success", None),
769            ("$.payload.result.time_cost", None),
770            ("$.payload.not-exists", None),
771            ("$.not-exists", None),
772            ("$", None),
773        ];
774
775        let mut jsons = json_strings
776            .iter()
777            .map(|s| {
778                let value = jsonb::parse_value(s.as_bytes()).unwrap();
779                Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
780            })
781            .collect::<Vec<_>>();
782        let json_struct_arrays =
783            std::iter::repeat_n(json_struct, path_expects.len() - jsons.len()).collect::<Vec<_>>();
784        jsons.extend(json_struct_arrays);
785
786        for i in 0..jsons.len() {
787            let json = &jsons[i];
788            let (path, expect) = path_expects[i];
789
790            let args = ScalarFunctionArgs {
791                args: vec![
792                    ColumnarValue::Array(json.clone()),
793                    ColumnarValue::Scalar(path.into()),
794                ],
795                arg_fields: vec![],
796                number_rows: 1,
797                return_field: Arc::new(Field::new("x", DataType::Int64, false)),
798                config_options: Arc::new(Default::default()),
799            };
800            let result = json_get_int
801                .invoke_with_args(args)
802                .and_then(|x| x.to_array(1))
803                .unwrap();
804
805            let result = result.as_primitive::<Int64Type>();
806            assert_eq!(1, result.len());
807            let actual = result.is_valid(0).then(|| result.value(0));
808            assert_eq!(actual, expect);
809        }
810    }
811
812    #[test]
813    fn test_json_get_float() {
814        let json_get_float = JsonGetFloat::default();
815
816        assert_eq!("json_get_float", json_get_float.name());
817        assert_eq!(
818            DataType::Float64,
819            json_get_float
820                .return_type(&[DataType::Binary, DataType::Utf8])
821                .unwrap()
822        );
823
824        let json_strings = [
825            r#"{"a": {"b": 2.1}, "b": 2.2, "c": 3.3}"#,
826            r#"{"a": 4.4, "b": {"c": 6.6}, "c": 6.6}"#,
827            r#"{"a": 7.7, "b": 8.8, "c": {"a": 7.7}}"#,
828        ];
829        let json_struct = test_json_struct();
830
831        let path_expects = vec![
832            ("$.a.b", Some(2.1)),
833            ("$.a", Some(4.4)),
834            ("$.c", None),
835            ("$.kind", None),
836            ("$.payload.code", None),
837            ("$.payload.success", None),
838            ("$.payload.result.time_cost", Some(1.234)),
839            ("$.payload.not-exists", None),
840            ("$.not-exists", None),
841            ("$", None),
842        ];
843
844        let mut jsons = json_strings
845            .iter()
846            .map(|s| {
847                let value = jsonb::parse_value(s.as_bytes()).unwrap();
848                Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
849            })
850            .collect::<Vec<_>>();
851        let json_struct_arrays =
852            std::iter::repeat_n(json_struct, path_expects.len() - jsons.len()).collect::<Vec<_>>();
853        jsons.extend(json_struct_arrays);
854
855        for i in 0..jsons.len() {
856            let json = &jsons[i];
857            let (path, expect) = path_expects[i];
858
859            let args = ScalarFunctionArgs {
860                args: vec![
861                    ColumnarValue::Array(json.clone()),
862                    ColumnarValue::Scalar(path.into()),
863                ],
864                arg_fields: vec![],
865                number_rows: 1,
866                return_field: Arc::new(Field::new("x", DataType::Float64, false)),
867                config_options: Arc::new(Default::default()),
868            };
869            let result = json_get_float
870                .invoke_with_args(args)
871                .and_then(|x| x.to_array(1))
872                .unwrap();
873
874            let result = result.as_primitive::<Float64Type>();
875            assert_eq!(1, result.len());
876            let actual = result.is_valid(0).then(|| result.value(0));
877            assert_eq!(actual, expect);
878        }
879    }
880
881    #[test]
882    fn test_json_get_bool() {
883        let json_get_bool = JsonGetBool::default();
884
885        assert_eq!("json_get_bool", json_get_bool.name());
886        assert_eq!(
887            DataType::Boolean,
888            json_get_bool
889                .return_type(&[DataType::Binary, DataType::Utf8])
890                .unwrap()
891        );
892
893        let json_strings = [
894            r#"{"a": {"b": true}, "b": false, "c": true}"#,
895            r#"{"a": false, "b": {"c": true}, "c": false}"#,
896            r#"{"a": true, "b": false, "c": {"a": true}}"#,
897        ];
898        let json_struct = test_json_struct();
899
900        let path_expects = vec![
901            ("$.a.b", Some(true)),
902            ("$.a", Some(false)),
903            ("$.c", None),
904            ("$.kind", None),
905            ("$.payload.code", None),
906            ("$.payload.success", Some(false)),
907            ("$.payload.result.time_cost", None),
908            ("$.payload.not-exists", None),
909            ("$.not-exists", None),
910            ("$", None),
911        ];
912
913        let mut jsons = json_strings
914            .iter()
915            .map(|s| {
916                let value = jsonb::parse_value(s.as_bytes()).unwrap();
917                Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
918            })
919            .collect::<Vec<_>>();
920        let json_struct_arrays =
921            std::iter::repeat_n(json_struct, path_expects.len() - jsons.len()).collect::<Vec<_>>();
922        jsons.extend(json_struct_arrays);
923
924        for i in 0..jsons.len() {
925            let json = &jsons[i];
926            let (path, expect) = path_expects[i];
927
928            let args = ScalarFunctionArgs {
929                args: vec![
930                    ColumnarValue::Array(json.clone()),
931                    ColumnarValue::Scalar(path.into()),
932                ],
933                arg_fields: vec![],
934                number_rows: 1,
935                return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
936                config_options: Arc::new(Default::default()),
937            };
938            let result = json_get_bool
939                .invoke_with_args(args)
940                .and_then(|x| x.to_array(1))
941                .unwrap();
942
943            let result = result.as_boolean();
944            assert_eq!(1, result.len());
945            let actual = result.is_valid(0).then(|| result.value(0));
946            assert_eq!(actual, expect);
947        }
948    }
949
950    #[test]
951    fn test_json_get_string() {
952        let json_get_string = JsonGetString::default();
953
954        assert_eq!("json_get_string", json_get_string.name());
955        assert_eq!(
956            DataType::Utf8View,
957            json_get_string
958                .return_type(&[DataType::Binary, DataType::Utf8])
959                .unwrap()
960        );
961
962        let json_strings = [
963            r#"{"a": {"b": "a"}, "b": "b", "c": "c"}"#,
964            r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
965            r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#,
966        ];
967        let json_struct = test_json_struct();
968
969        let paths = vec![
970            "$.a.b",
971            "$.a",
972            "",
973            "$.kind",
974            "$.payload.code",
975            "$.payload.result.time_cost",
976            "$.payload",
977            "$.payload.success",
978            "$.payload.result",
979            "$.payload.result.error",
980            "$.payload.result.not-exists",
981            "$.payload.not-exists",
982            "$.not-exists",
983            "$",
984        ];
985        let expects = [
986            Some("a"),
987            Some("d"),
988            None,
989            Some("foo"),
990            Some("404"),
991            Some("1.234"),
992            Some(
993                r#"{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}"#,
994            ),
995            Some("false"),
996            Some(r#"{"error":"not found","time_cost":1.234}"#),
997            Some("not found"),
998            None,
999            None,
1000            None,
1001            Some(
1002                r#"{"kind":"foo","payload":{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}}"#,
1003            ),
1004        ];
1005
1006        let mut jsons = json_strings
1007            .iter()
1008            .map(|s| {
1009                let value = jsonb::parse_value(s.as_bytes()).unwrap();
1010                Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
1011            })
1012            .collect::<Vec<_>>();
1013        let json_struct_arrays =
1014            std::iter::repeat_n(json_struct, expects.len() - jsons.len()).collect::<Vec<_>>();
1015        jsons.extend(json_struct_arrays);
1016
1017        for i in 0..jsons.len() {
1018            let json = &jsons[i];
1019            let path = paths[i];
1020            let expect = expects[i];
1021
1022            let args = ScalarFunctionArgs {
1023                args: vec![
1024                    ColumnarValue::Array(json.clone()),
1025                    ColumnarValue::Scalar(path.into()),
1026                ],
1027                arg_fields: vec![],
1028                number_rows: 1,
1029                return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
1030                config_options: Arc::new(Default::default()),
1031            };
1032            let result = json_get_string
1033                .invoke_with_args(args)
1034                .and_then(|x| x.to_array(1))
1035                .unwrap();
1036
1037            let result = result.as_string_view();
1038            assert_eq!(1, result.len());
1039            let actual = result.is_valid(0).then(|| result.value(0));
1040            assert_eq!(actual, expect);
1041        }
1042    }
1043
1044    #[test]
1045    fn test_json_get_object() -> Result<()> {
1046        let udf = JsonGetObject::default();
1047        assert_eq!("json_get_object", udf.name());
1048        assert_eq!(
1049            DataType::BinaryView,
1050            udf.return_type(&[DataType::BinaryView, DataType::Utf8View])?
1051        );
1052
1053        let json_value = parse_string_to_jsonb(r#"{"a": {"b": {"c": {"d": 1}}}}"#).unwrap();
1054        let paths = vec!["$", "$.a", "$.a.b", "$.a.b.c", "$.a.b.c.d", "$.e", "$.a.e"];
1055        let number_rows = paths.len();
1056
1057        let args = ScalarFunctionArgs {
1058            args: vec![
1059                ColumnarValue::Scalar(ScalarValue::Binary(Some(json_value))),
1060                ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
1061            ],
1062            arg_fields: vec![],
1063            number_rows,
1064            return_field: Arc::new(Field::new("x", DataType::Binary, false)),
1065            config_options: Arc::new(Default::default()),
1066        };
1067        let result = udf
1068            .invoke_with_args(args)
1069            .and_then(|x| x.to_array(number_rows))?;
1070        let result = result.as_binary_view();
1071
1072        let expected = &BinaryViewArray::from_iter(
1073            vec![
1074                Some(r#"{"a": {"b": {"c": {"d": 1}}}}"#),
1075                Some(r#"{"b": {"c": {"d": 1}}}"#),
1076                Some(r#"{"c": {"d": 1}}"#),
1077                Some(r#"{"d": 1}"#),
1078                None,
1079                None,
1080                None,
1081            ]
1082            .into_iter()
1083            .map(|x| x.and_then(|s| parse_string_to_jsonb(s).ok())),
1084        );
1085        assert_eq!(result, expected);
1086        Ok(())
1087    }
1088
1089    #[test]
1090    fn test_json_get_with_type() {
1091        let json_get_with_type = JsonGetWithType::default();
1092
1093        assert_eq!("json_get", json_get_with_type.name());
1094
1095        let json_strings = [
1096            r#"{"a": {"b": "a"}, "b": "b", "c": "c"}"#,
1097            r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
1098            r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#,
1099        ];
1100        let json_struct = test_json_struct();
1101
1102        let paths = vec![
1103            "$.a.b",
1104            "$.a",
1105            "",
1106            "$.kind",
1107            "$.payload.code",
1108            "$.payload.result.time_cost",
1109            "$.payload",
1110            "$.payload.success",
1111            "$.payload.result",
1112            "$.payload.result.error",
1113            "$.payload.result.not-exists",
1114            "$.payload.not-exists",
1115            "$.not-exists",
1116            "$",
1117        ];
1118        let expects = [
1119            Some("a"),
1120            Some("d"),
1121            None,
1122            Some("foo"),
1123            Some("404"),
1124            Some("1.234"),
1125            Some(
1126                r#"{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}"#,
1127            ),
1128            Some("false"),
1129            Some(r#"{"error":"not found","time_cost":1.234}"#),
1130            Some("not found"),
1131            None,
1132            None,
1133            None,
1134            Some(
1135                r#"{"kind":"foo","payload":{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}}"#,
1136            ),
1137        ];
1138
1139        let mut jsons = json_strings
1140            .iter()
1141            .map(|s| {
1142                let value = jsonb::parse_value(s.as_bytes()).unwrap();
1143                Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
1144            })
1145            .collect::<Vec<_>>();
1146        let json_struct_arrays =
1147            std::iter::repeat_n(json_struct, expects.len() - jsons.len()).collect::<Vec<_>>();
1148        jsons.extend(json_struct_arrays);
1149
1150        for i in 0..jsons.len() {
1151            let json = &jsons[i];
1152            let path = paths[i];
1153            let expect = expects[i];
1154
1155            let args = ScalarFunctionArgs {
1156                args: vec![
1157                    ColumnarValue::Array(json.clone()),
1158                    ColumnarValue::Scalar(path.into()),
1159                    ColumnarValue::Scalar(ScalarValue::Utf8(Some("string".to_string()))),
1160                ],
1161                arg_fields: vec![],
1162                number_rows: 1,
1163                return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
1164                config_options: Arc::new(Default::default()),
1165            };
1166            let result = json_get_with_type
1167                .invoke_with_args(args)
1168                .and_then(|x| x.to_array(1))
1169                .unwrap();
1170
1171            let result = result.as_string_view();
1172            assert_eq!(1, result.len());
1173            let actual = result.is_valid(0).then(|| result.value(0));
1174            assert_eq!(actual, expect);
1175        }
1176
1177        let json_strings = [
1178            r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
1179            r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
1180            r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
1181        ];
1182        let paths = ["$.a.b", "$.a", "$.c", "$.payload.code"];
1183        let expects = [Some(2), Some(4), None, Some(404)];
1184
1185        for (i, (path, expect)) in paths.iter().zip(expects.iter()).enumerate() {
1186            let json = if i < json_strings.len() {
1187                let value = jsonb::parse_value(json_strings[i].as_bytes()).unwrap();
1188                Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
1189            } else {
1190                test_json_struct()
1191            };
1192
1193            let args = ScalarFunctionArgs {
1194                args: vec![
1195                    ColumnarValue::Array(json),
1196                    ColumnarValue::Scalar((*path).into()),
1197                    ColumnarValue::Scalar(ScalarValue::Utf8(Some("int".to_string()))),
1198                ],
1199                arg_fields: vec![],
1200                number_rows: 1,
1201                return_field: Arc::new(Field::new("x", DataType::Int64, false)),
1202                config_options: Arc::new(Default::default()),
1203            };
1204            let result = json_get_with_type
1205                .invoke_with_args(args)
1206                .and_then(|x| x.to_array(1))
1207                .unwrap();
1208
1209            let result = result.as_primitive::<Int64Type>();
1210            assert_eq!(1, result.len());
1211            let actual = result.is_valid(0).then(|| result.value(0));
1212            assert_eq!(actual, *expect);
1213        }
1214
1215        let json_strings = [
1216            r#"{"a": {"b": 2.1}, "b": 2.2, "c": 3.3}"#,
1217            r#"{"a": 4.4, "b": {"c": 6.6}, "c": 6.6}"#,
1218            r#"{"a": 7.7, "b": 8.8, "c": {"a": 7.7}}"#,
1219        ];
1220        let paths = ["$.a.b", "$.a", "$.c", "$.payload.result.time_cost"];
1221        let expects = [Some(2.1), Some(4.4), None, Some(1.234)];
1222
1223        for (i, (path, expect)) in paths.iter().zip(expects.iter()).enumerate() {
1224            let json = if i < json_strings.len() {
1225                let value = jsonb::parse_value(json_strings[i].as_bytes()).unwrap();
1226                Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
1227            } else {
1228                test_json_struct()
1229            };
1230
1231            let args = ScalarFunctionArgs {
1232                args: vec![
1233                    ColumnarValue::Array(json),
1234                    ColumnarValue::Scalar((*path).into()),
1235                    ColumnarValue::Scalar(ScalarValue::Utf8(Some("float".to_string()))),
1236                ],
1237                arg_fields: vec![],
1238                number_rows: 1,
1239                return_field: Arc::new(Field::new("x", DataType::Float64, false)),
1240                config_options: Arc::new(Default::default()),
1241            };
1242            let result = json_get_with_type
1243                .invoke_with_args(args)
1244                .and_then(|x| x.to_array(1))
1245                .unwrap();
1246
1247            let result = result.as_primitive::<Float64Type>();
1248            assert_eq!(1, result.len());
1249            let actual = result.is_valid(0).then(|| result.value(0));
1250            assert_eq!(actual, *expect);
1251        }
1252
1253        let json_strings = [
1254            r#"{"a": {"b": true}, "b": false, "c": true}"#,
1255            r#"{"a": false, "b": {"c": true}, "c": false}"#,
1256            r#"{"a": true, "b": false, "c": {"a": true}}"#,
1257        ];
1258        let paths = ["$.a.b", "$.a", "$.c", "$.payload.success"];
1259        let expects = [Some(true), Some(false), None, Some(false)];
1260
1261        for (i, (path, expect)) in paths.iter().zip(expects.iter()).enumerate() {
1262            let json = if i < json_strings.len() {
1263                let value = jsonb::parse_value(json_strings[i].as_bytes()).unwrap();
1264                Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
1265            } else {
1266                test_json_struct()
1267            };
1268
1269            let args = ScalarFunctionArgs {
1270                args: vec![
1271                    ColumnarValue::Array(json),
1272                    ColumnarValue::Scalar((*path).into()),
1273                    ColumnarValue::Scalar(ScalarValue::Utf8(Some("bool".to_string()))),
1274                ],
1275                arg_fields: vec![],
1276                number_rows: 1,
1277                return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
1278                config_options: Arc::new(Default::default()),
1279            };
1280            let result = json_get_with_type
1281                .invoke_with_args(args)
1282                .and_then(|x| x.to_array(1))
1283                .unwrap();
1284
1285            let result = result.as_boolean();
1286            assert_eq!(1, result.len());
1287            let actual = result.is_valid(0).then(|| result.value(0));
1288            assert_eq!(actual, *expect);
1289        }
1290    }
1291}