common_function/scalars/json/
json_get.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::FromStr;
16use std::sync::Arc;
17
18use arrow::array::{ArrayRef, BinaryViewArray, StringViewArray, StructArray};
19use arrow::compute;
20use arrow::datatypes::{Float64Type, Int64Type, UInt64Type};
21use arrow_schema::Field;
22use datafusion_common::arrow::array::{
23    Array, AsArray, BinaryViewBuilder, BooleanBuilder, Float64Builder, Int64Builder,
24    StringViewBuilder,
25};
26use datafusion_common::arrow::datatypes::DataType;
27use datafusion_common::{DataFusionError, Result, ScalarValue};
28use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
29use datatypes::arrow_array::{int_array_value_at_index, string_array_value_at_index};
30use datatypes::json::JsonStructureSettings;
31use derive_more::Display;
32use jsonpath_rust::JsonPath;
33use serde_json::Value;
34
35use crate::function::{Function, extract_args};
36use crate::helper;
37
38fn get_json_by_path(json: &[u8], path: &str) -> Option<Vec<u8>> {
39    let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes());
40    match json_path {
41        Ok(json_path) => {
42            let mut sub_jsonb = Vec::new();
43            let mut sub_offsets = Vec::new();
44            match jsonb::get_by_path(json, json_path, &mut sub_jsonb, &mut sub_offsets) {
45                Ok(_) => Some(sub_jsonb),
46                Err(_) => None,
47            }
48        }
49        _ => None,
50    }
51}
52
53enum JsonResultValue<'a> {
54    Jsonb(Vec<u8>),
55    JsonStructByColumn(&'a ArrayRef, usize),
56    JsonStructByValue(&'a Value),
57}
58
59trait JsonGetResultBuilder {
60    fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()>;
61
62    fn append_null(&mut self);
63
64    fn build(&mut self) -> ArrayRef;
65}
66
67/// Common implementation for JSON get scalar functions.
68///
69/// `JsonGet` encapsulates the logic for extracting values from JSON inputs
70/// based on a path expression. Different JSON get functions reuse this
71/// implementation by supplying their own `JsonGetResultBuilder` to control
72/// how the resulting values are materialized into an Arrow array.
73#[derive(Debug)]
74struct JsonGet {
75    signature: Signature,
76}
77
78impl JsonGet {
79    fn invoke<F, B>(&self, args: ScalarFunctionArgs, builder_factory: F) -> Result<ColumnarValue>
80    where
81        F: Fn(usize) -> B,
82        B: JsonGetResultBuilder,
83    {
84        let [arg0, arg1] = extract_args("JSON_GET", &args)?;
85
86        let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
87        let paths = arg1.as_string_view();
88
89        let mut builder = (builder_factory)(arg0.len());
90        match arg0.data_type() {
91            DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
92                let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
93                let jsons = arg0.as_binary_view();
94                jsonb_get(jsons, paths, &mut builder)?;
95            }
96            DataType::Struct(_) => {
97                let jsons = arg0.as_struct();
98                json_struct_get(jsons, paths, &mut builder)?
99            }
100            _ => {
101                return Err(DataFusionError::Execution(format!(
102                    "JSON_GET not supported argument type {}",
103                    arg0.data_type(),
104                )));
105            }
106        };
107
108        Ok(ColumnarValue::Array(builder.build()))
109    }
110}
111
112impl Default for JsonGet {
113    fn default() -> Self {
114        Self {
115            signature: Signature::any(2, Volatility::Immutable),
116        }
117    }
118}
119
120struct StringResultBuilder(StringViewBuilder);
121
122impl JsonGetResultBuilder for StringResultBuilder {
123    fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
124        match value {
125            JsonResultValue::Jsonb(value) => self.0.append_option(jsonb::to_str(&value).ok()),
126            JsonResultValue::JsonStructByColumn(column, i) => {
127                if let Some(v) = string_array_value_at_index(column, i) {
128                    self.0.append_value(v);
129                } else {
130                    self.0
131                        .append_value(arrow_cast::display::array_value_to_string(column, i)?);
132                }
133            }
134            JsonResultValue::JsonStructByValue(value) => {
135                if let Some(s) = value.as_str() {
136                    self.0.append_value(s)
137                } else {
138                    self.0.append_value(value.to_string())
139                }
140            }
141        }
142        Ok(())
143    }
144
145    fn append_null(&mut self) {
146        self.0.append_null();
147    }
148
149    fn build(&mut self) -> ArrayRef {
150        Arc::new(self.0.finish())
151    }
152}
153
154#[derive(Default, Display, Debug)]
155#[display("{}", Self::NAME.to_ascii_uppercase())]
156pub struct JsonGetString(JsonGet);
157
158impl JsonGetString {
159    pub const NAME: &'static str = "json_get_string";
160}
161
162impl Function for JsonGetString {
163    fn name(&self) -> &str {
164        Self::NAME
165    }
166
167    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
168        Ok(DataType::Utf8View)
169    }
170
171    fn signature(&self) -> &Signature {
172        &self.0.signature
173    }
174
175    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
176        self.0.invoke(args, |len: usize| {
177            StringResultBuilder(StringViewBuilder::with_capacity(len))
178        })
179    }
180}
181
182struct IntResultBuilder(Int64Builder);
183
184impl JsonGetResultBuilder for IntResultBuilder {
185    fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
186        match value {
187            JsonResultValue::Jsonb(value) => self.0.append_option(jsonb::to_i64(&value).ok()),
188            JsonResultValue::JsonStructByColumn(column, i) => {
189                self.0.append_option(int_array_value_at_index(column, i))
190            }
191            JsonResultValue::JsonStructByValue(value) => self.0.append_option(value.as_i64()),
192        }
193        Ok(())
194    }
195
196    fn append_null(&mut self) {
197        self.0.append_null();
198    }
199
200    fn build(&mut self) -> ArrayRef {
201        Arc::new(self.0.finish())
202    }
203}
204
205#[derive(Default, Display, Debug)]
206#[display("{}", Self::NAME.to_ascii_uppercase())]
207pub struct JsonGetInt(JsonGet);
208
209impl JsonGetInt {
210    pub const NAME: &'static str = "json_get_int";
211}
212
213impl Function for JsonGetInt {
214    fn name(&self) -> &str {
215        Self::NAME
216    }
217
218    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
219        Ok(DataType::Int64)
220    }
221
222    fn signature(&self) -> &Signature {
223        &self.0.signature
224    }
225
226    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
227        self.0.invoke(args, |len: usize| {
228            IntResultBuilder(Int64Builder::with_capacity(len))
229        })
230    }
231}
232
233struct FloatResultBuilder(Float64Builder);
234
235impl JsonGetResultBuilder for FloatResultBuilder {
236    fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
237        match value {
238            JsonResultValue::Jsonb(value) => self.0.append_option(jsonb::to_f64(&value).ok()),
239            JsonResultValue::JsonStructByColumn(column, i) => {
240                let result = if column.data_type() == &DataType::Float64 {
241                    column
242                        .as_primitive::<Float64Type>()
243                        .is_valid(i)
244                        .then(|| column.as_primitive::<Float64Type>().value(i))
245                } else {
246                    None
247                };
248                self.0.append_option(result);
249            }
250            JsonResultValue::JsonStructByValue(value) => self.0.append_option(value.as_f64()),
251        }
252        Ok(())
253    }
254
255    fn append_null(&mut self) {
256        self.0.append_null();
257    }
258
259    fn build(&mut self) -> ArrayRef {
260        Arc::new(self.0.finish())
261    }
262}
263
264#[derive(Default, Display, Debug)]
265#[display("{}", Self::NAME.to_ascii_uppercase())]
266pub struct JsonGetFloat(JsonGet);
267
268impl JsonGetFloat {
269    pub const NAME: &'static str = "json_get_float";
270}
271
272impl Function for JsonGetFloat {
273    fn name(&self) -> &str {
274        Self::NAME
275    }
276
277    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
278        Ok(DataType::Float64)
279    }
280
281    fn signature(&self) -> &Signature {
282        &self.0.signature
283    }
284
285    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
286        self.0.invoke(args, |len: usize| {
287            FloatResultBuilder(Float64Builder::with_capacity(len))
288        })
289    }
290}
291
292struct BoolResultBuilder(BooleanBuilder);
293
294impl JsonGetResultBuilder for BoolResultBuilder {
295    fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
296        match value {
297            JsonResultValue::Jsonb(value) => self.0.append_option(jsonb::to_bool(&value).ok()),
298            JsonResultValue::JsonStructByColumn(column, i) => {
299                let result = if column.data_type() == &DataType::Boolean {
300                    column
301                        .as_boolean()
302                        .is_valid(i)
303                        .then(|| column.as_boolean().value(i))
304                } else {
305                    None
306                };
307                self.0.append_option(result);
308            }
309            JsonResultValue::JsonStructByValue(value) => self.0.append_option(value.as_bool()),
310        }
311        Ok(())
312    }
313
314    fn append_null(&mut self) {
315        self.0.append_null();
316    }
317
318    fn build(&mut self) -> ArrayRef {
319        Arc::new(self.0.finish())
320    }
321}
322
323#[derive(Default, Display, Debug)]
324#[display("{}", Self::NAME.to_ascii_uppercase())]
325pub struct JsonGetBool(JsonGet);
326
327impl JsonGetBool {
328    pub const NAME: &'static str = "json_get_bool";
329}
330
331impl Function for JsonGetBool {
332    fn name(&self) -> &str {
333        Self::NAME
334    }
335
336    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
337        Ok(DataType::Boolean)
338    }
339
340    fn signature(&self) -> &Signature {
341        &self.0.signature
342    }
343
344    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
345        self.0.invoke(args, |len: usize| {
346            BoolResultBuilder(BooleanBuilder::with_capacity(len))
347        })
348    }
349}
350
351fn jsonb_get(
352    jsons: &BinaryViewArray,
353    paths: &StringViewArray,
354    builder: &mut dyn JsonGetResultBuilder,
355) -> Result<()> {
356    let size = jsons.len();
357    for i in 0..size {
358        let json = jsons.is_valid(i).then(|| jsons.value(i));
359        let path = paths.is_valid(i).then(|| paths.value(i));
360        let result = match (json, path) {
361            (Some(json), Some(path)) => get_json_by_path(json, path),
362            _ => None,
363        };
364        if let Some(v) = result {
365            builder.append_value(JsonResultValue::Jsonb(v))?;
366        } else {
367            builder.append_null();
368        }
369    }
370    Ok(())
371}
372
373fn json_struct_get(
374    jsons: &StructArray,
375    paths: &StringViewArray,
376    builder: &mut dyn JsonGetResultBuilder,
377) -> Result<()> {
378    let size = jsons.len();
379    for i in 0..size {
380        if jsons.is_null(i) || paths.is_null(i) {
381            builder.append_null();
382            continue;
383        }
384        let path = paths.value(i);
385
386        // naively assume the JSON path is our kind of indexing to the field, by removing its "root"
387        let field_path = path.trim().replace("$.", "");
388        let column = jsons.column_by_name(&field_path);
389
390        if let Some(column) = column {
391            builder.append_value(JsonResultValue::JsonStructByColumn(column, i))?;
392        } else {
393            let Some(raw) = jsons
394                .column_by_name(JsonStructureSettings::RAW_FIELD)
395                .and_then(|x| string_array_value_at_index(x, i))
396            else {
397                builder.append_null();
398                continue;
399            };
400
401            let path: JsonPath<Value> = JsonPath::try_from(path).map_err(|e| {
402                DataFusionError::Execution(format!("{path} is not a valid JSON path: {e}"))
403            })?;
404            // the wanted field is not retrievable from the JSON struct columns directly, we have
405            // to combine everything (columns and the "_raw") into a complete JSON value to find it
406            let value = json_struct_to_value(raw, jsons, i)?;
407
408            match path.find(&value) {
409                Value::Null => builder.append_null(),
410                Value::Array(values) => match values.as_slice() {
411                    [] => builder.append_null(),
412                    [x] => builder.append_value(JsonResultValue::JsonStructByValue(x))?,
413                    _ => builder.append_value(JsonResultValue::JsonStructByValue(&value))?,
414                },
415                value => builder.append_value(JsonResultValue::JsonStructByValue(&value))?,
416            }
417        }
418    }
419
420    Ok(())
421}
422
423fn json_struct_to_value(raw: &str, jsons: &StructArray, i: usize) -> Result<Value> {
424    let Ok(mut json) = Value::from_str(raw) else {
425        return Err(DataFusionError::Internal(format!(
426            "inner field '{}' is not a valid JSON string",
427            JsonStructureSettings::RAW_FIELD
428        )));
429    };
430
431    for (column_name, column) in jsons.column_names().into_iter().zip(jsons.columns()) {
432        if column_name == JsonStructureSettings::RAW_FIELD {
433            continue;
434        }
435
436        let (json_pointer, field) = if let Some((json_object, field)) = column_name.rsplit_once(".")
437        {
438            let json_pointer = format!("/{}", json_object.replace(".", "/"));
439            (json_pointer, field)
440        } else {
441            ("".to_string(), column_name)
442        };
443        let Some(json_object) = json
444            .pointer_mut(&json_pointer)
445            .and_then(|x| x.as_object_mut())
446        else {
447            return Err(DataFusionError::Internal(format!(
448                "value at JSON pointer '{}' is not an object",
449                json_pointer
450            )));
451        };
452
453        macro_rules! insert {
454            ($column: ident, $i: ident, $json_object: ident, $field: ident) => {{
455                if let Some(value) = $column
456                    .is_valid($i)
457                    .then(|| serde_json::Value::from($column.value($i)))
458                {
459                    $json_object.insert($field.to_string(), value);
460                }
461            }};
462        }
463
464        match column.data_type() {
465            // boolean => Value::Bool
466            DataType::Boolean => {
467                let column = column.as_boolean();
468                insert!(column, i, json_object, field);
469            }
470            // int => Value::Number
471            DataType::Int64 => {
472                let column = column.as_primitive::<Int64Type>();
473                insert!(column, i, json_object, field);
474            }
475            DataType::UInt64 => {
476                let column = column.as_primitive::<UInt64Type>();
477                insert!(column, i, json_object, field);
478            }
479            DataType::Float64 => {
480                let column = column.as_primitive::<Float64Type>();
481                insert!(column, i, json_object, field);
482            }
483            // string => Value::String
484            DataType::Utf8 => {
485                let column = column.as_string::<i32>();
486                insert!(column, i, json_object, field);
487            }
488            DataType::LargeUtf8 => {
489                let column = column.as_string::<i64>();
490                insert!(column, i, json_object, field);
491            }
492            DataType::Utf8View => {
493                let column = column.as_string_view();
494                insert!(column, i, json_object, field);
495            }
496            // other => Value::Array and Value::Object
497            _ => {
498                return Err(DataFusionError::NotImplemented(format!(
499                    "{} is not yet supported to be executed with field {} of datatype {}",
500                    JsonGetString::NAME,
501                    column_name,
502                    column.data_type()
503                )));
504            }
505        }
506    }
507    Ok(json)
508}
509
510#[derive(Debug, Display)]
511#[display("{}", Self::NAME.to_ascii_uppercase())]
512pub(super) struct JsonGetWithType {
513    signature: Signature,
514}
515
516impl JsonGetWithType {
517    const NAME: &'static str = "json_get";
518}
519
520impl Default for JsonGetWithType {
521    fn default() -> Self {
522        Self {
523            signature: Signature::any(3, Volatility::Immutable),
524        }
525    }
526}
527
528impl Function for JsonGetWithType {
529    fn name(&self) -> &str {
530        Self::NAME
531    }
532
533    fn return_type(&self, _input_types: &[DataType]) -> datafusion_common::Result<DataType> {
534        Err(DataFusionError::Internal(
535            "This method isn't meant to be called".to_string(),
536        ))
537    }
538
539    fn return_field_from_args(
540        &self,
541        args: datafusion_expr::ReturnFieldArgs<'_>,
542    ) -> datafusion_common::Result<Arc<Field>> {
543        match args.scalar_arguments[2] {
544            Some(ScalarValue::Utf8(Some(type_str)))
545            | Some(ScalarValue::Utf8View(Some(type_str)))
546            | Some(ScalarValue::LargeUtf8(Some(type_str))) => {
547                let type_str = type_str.to_ascii_lowercase();
548                match type_str.as_str() {
549                    "bool" | "boolean" => {
550                        Ok(Arc::new(Field::new(self.name(), DataType::Boolean, true)))
551                    }
552                    "int" | "integer" => {
553                        Ok(Arc::new(Field::new(self.name(), DataType::Int64, true)))
554                    }
555                    "float" | "double" => {
556                        Ok(Arc::new(Field::new(self.name(), DataType::Float64, true)))
557                    }
558                    "string" => Ok(Arc::new(Field::new(self.name(), DataType::Utf8View, true))),
559                    _ => Err(DataFusionError::Internal(format!(
560                        "Unsupported type: {}",
561                        type_str
562                    ))),
563                }
564            }
565            _ => Err(DataFusionError::Internal(
566                "Invalid argument provided for type".to_string(),
567            )),
568        }
569    }
570
571    fn signature(&self) -> &Signature {
572        &self.signature
573    }
574
575    fn invoke_with_args(
576        &self,
577        args: ScalarFunctionArgs,
578    ) -> datafusion_common::Result<ColumnarValue> {
579        let [arg0, arg1, _] = extract_args("JSON_GET", &args)?;
580        let len = arg0.len();
581
582        let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
583        let paths = arg1.as_string_view();
584
585        // mapping datatypes returned from return_field_from_args
586        let mut builder: Box<dyn JsonGetResultBuilder> = match args.return_field.data_type() {
587            DataType::Utf8View => {
588                Box::new(StringResultBuilder(StringViewBuilder::with_capacity(len)))
589            }
590            DataType::Int64 => Box::new(IntResultBuilder(Int64Builder::with_capacity(len))),
591            DataType::Float64 => Box::new(FloatResultBuilder(Float64Builder::with_capacity(len))),
592            DataType::Boolean => Box::new(BoolResultBuilder(BooleanBuilder::with_capacity(len))),
593            _ => {
594                return Err(DataFusionError::Internal(
595                    "Unsupported return type".to_string(),
596                ));
597            }
598        };
599
600        match arg0.data_type() {
601            DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
602                let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
603                let jsons = arg0.as_binary_view();
604                jsonb_get(jsons, paths, builder.as_mut())?;
605            }
606            DataType::Struct(_) => {
607                let jsons = arg0.as_struct();
608                json_struct_get(jsons, paths, builder.as_mut())?;
609            }
610            _ => {
611                return Err(DataFusionError::Execution(format!(
612                    "JSON_GET not supported argument type {}",
613                    arg0.data_type(),
614                )));
615            }
616        };
617
618        Ok(ColumnarValue::Array(builder.build()))
619    }
620}
621
622/// Get the object from JSON value by path.
623#[derive(Display, Debug)]
624#[display("{}", Self::NAME.to_ascii_uppercase())]
625pub(super) struct JsonGetObject {
626    signature: Signature,
627}
628
629impl JsonGetObject {
630    const NAME: &'static str = "json_get_object";
631}
632
633impl Default for JsonGetObject {
634    fn default() -> Self {
635        Self {
636            signature: helper::one_of_sigs2(
637                vec![
638                    DataType::Binary,
639                    DataType::LargeBinary,
640                    DataType::BinaryView,
641                ],
642                vec![DataType::UInt8, DataType::LargeUtf8, DataType::Utf8View],
643            ),
644        }
645    }
646}
647
648impl Function for JsonGetObject {
649    fn name(&self) -> &str {
650        Self::NAME
651    }
652
653    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
654        Ok(DataType::BinaryView)
655    }
656
657    fn signature(&self) -> &Signature {
658        &self.signature
659    }
660
661    fn invoke_with_args(
662        &self,
663        args: ScalarFunctionArgs,
664    ) -> datafusion_common::Result<ColumnarValue> {
665        let [arg0, arg1] = extract_args(self.name(), &args)?;
666        let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
667        let jsons = arg0.as_binary_view();
668        let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
669        let paths = arg1.as_string_view();
670
671        let len = jsons.len();
672        let mut builder = BinaryViewBuilder::with_capacity(len);
673
674        for i in 0..len {
675            let json = jsons.is_valid(i).then(|| jsons.value(i));
676            let path = paths.is_valid(i).then(|| paths.value(i));
677            let result = if let (Some(json), Some(path)) = (json, path) {
678                let result = jsonb::jsonpath::parse_json_path(path.as_bytes()).and_then(|path| {
679                    let mut data = Vec::new();
680                    let mut offset = Vec::new();
681                    jsonb::get_by_path(json, path, &mut data, &mut offset)
682                        .map(|()| jsonb::is_object(&data).then_some(data))
683                });
684                result.map_err(|e| DataFusionError::Execution(e.to_string()))?
685            } else {
686                None
687            };
688            builder.append_option(result);
689        }
690
691        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
692    }
693}
694
695#[cfg(test)]
696mod tests {
697    use std::sync::Arc;
698
699    use arrow::array::{Float64Array, Int64Array, StructArray};
700    use arrow_schema::Field;
701    use datafusion_common::ScalarValue;
702    use datafusion_common::arrow::array::{BinaryArray, BinaryViewArray, StringArray};
703    use datafusion_common::arrow::datatypes::{Float64Type, Int64Type};
704    use datatypes::types::parse_string_to_jsonb;
705    use serde_json::json;
706
707    use super::*;
708
709    /// Create a JSON object like this (as a one element struct array for testing):
710    ///
711    /// ```JSON
712    /// {
713    ///     "kind": "foo",
714    ///     "payload": {
715    ///         "code": 404,
716    ///         "success": false,
717    ///         "result": {
718    ///             "error": "not found",
719    ///             "time_cost": 1.234
720    ///         }
721    ///     }
722    /// }
723    /// ```
724    fn test_json_struct() -> ArrayRef {
725        Arc::new(StructArray::new(
726            vec![
727                Field::new("kind", DataType::Utf8, true),
728                Field::new("payload.code", DataType::Int64, true),
729                Field::new("payload.result.time_cost", DataType::Float64, true),
730                Field::new(JsonStructureSettings::RAW_FIELD, DataType::Utf8View, true),
731            ]
732            .into(),
733            vec![
734                Arc::new(StringArray::from_iter([Some("foo")])) as ArrayRef,
735                Arc::new(Int64Array::from_iter([Some(404)])),
736                Arc::new(Float64Array::from_iter([Some(1.234)])),
737                Arc::new(StringViewArray::from_iter([Some(
738                    json! ({
739                        "payload": {
740                            "success": false,
741                            "result": {
742                                "error": "not found"
743                            }
744                        }
745                    })
746                    .to_string(),
747                )])),
748            ],
749            None,
750        ))
751    }
752
753    #[test]
754    fn test_json_get_int() {
755        let json_get_int = JsonGetInt::default();
756
757        assert_eq!("json_get_int", json_get_int.name());
758        assert_eq!(
759            DataType::Int64,
760            json_get_int
761                .return_type(&[DataType::Binary, DataType::Utf8])
762                .unwrap()
763        );
764
765        let json_strings = [
766            r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
767            r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
768            r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
769        ];
770        let json_struct = test_json_struct();
771
772        let path_expects = vec![
773            ("$.a.b", Some(2)),
774            ("$.a", Some(4)),
775            ("$.c", None),
776            ("$.kind", None),
777            ("$.payload.code", Some(404)),
778            ("$.payload.success", None),
779            ("$.payload.result.time_cost", None),
780            ("$.payload.not-exists", None),
781            ("$.not-exists", None),
782            ("$", None),
783        ];
784
785        let mut jsons = json_strings
786            .iter()
787            .map(|s| {
788                let value = jsonb::parse_value(s.as_bytes()).unwrap();
789                Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
790            })
791            .collect::<Vec<_>>();
792        let json_struct_arrays =
793            std::iter::repeat_n(json_struct, path_expects.len() - jsons.len()).collect::<Vec<_>>();
794        jsons.extend(json_struct_arrays);
795
796        for i in 0..jsons.len() {
797            let json = &jsons[i];
798            let (path, expect) = path_expects[i];
799
800            let args = ScalarFunctionArgs {
801                args: vec![
802                    ColumnarValue::Array(json.clone()),
803                    ColumnarValue::Scalar(path.into()),
804                ],
805                arg_fields: vec![],
806                number_rows: 1,
807                return_field: Arc::new(Field::new("x", DataType::Int64, false)),
808                config_options: Arc::new(Default::default()),
809            };
810            let result = json_get_int
811                .invoke_with_args(args)
812                .and_then(|x| x.to_array(1))
813                .unwrap();
814
815            let result = result.as_primitive::<Int64Type>();
816            assert_eq!(1, result.len());
817            let actual = result.is_valid(0).then(|| result.value(0));
818            assert_eq!(actual, expect);
819        }
820    }
821
822    #[test]
823    fn test_json_get_float() {
824        let json_get_float = JsonGetFloat::default();
825
826        assert_eq!("json_get_float", json_get_float.name());
827        assert_eq!(
828            DataType::Float64,
829            json_get_float
830                .return_type(&[DataType::Binary, DataType::Utf8])
831                .unwrap()
832        );
833
834        let json_strings = [
835            r#"{"a": {"b": 2.1}, "b": 2.2, "c": 3.3}"#,
836            r#"{"a": 4.4, "b": {"c": 6.6}, "c": 6.6}"#,
837            r#"{"a": 7.7, "b": 8.8, "c": {"a": 7.7}}"#,
838        ];
839        let json_struct = test_json_struct();
840
841        let path_expects = vec![
842            ("$.a.b", Some(2.1)),
843            ("$.a", Some(4.4)),
844            ("$.c", None),
845            ("$.kind", None),
846            ("$.payload.code", None),
847            ("$.payload.success", None),
848            ("$.payload.result.time_cost", Some(1.234)),
849            ("$.payload.not-exists", None),
850            ("$.not-exists", None),
851            ("$", None),
852        ];
853
854        let mut jsons = json_strings
855            .iter()
856            .map(|s| {
857                let value = jsonb::parse_value(s.as_bytes()).unwrap();
858                Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
859            })
860            .collect::<Vec<_>>();
861        let json_struct_arrays =
862            std::iter::repeat_n(json_struct, path_expects.len() - jsons.len()).collect::<Vec<_>>();
863        jsons.extend(json_struct_arrays);
864
865        for i in 0..jsons.len() {
866            let json = &jsons[i];
867            let (path, expect) = path_expects[i];
868
869            let args = ScalarFunctionArgs {
870                args: vec![
871                    ColumnarValue::Array(json.clone()),
872                    ColumnarValue::Scalar(path.into()),
873                ],
874                arg_fields: vec![],
875                number_rows: 1,
876                return_field: Arc::new(Field::new("x", DataType::Float64, false)),
877                config_options: Arc::new(Default::default()),
878            };
879            let result = json_get_float
880                .invoke_with_args(args)
881                .and_then(|x| x.to_array(1))
882                .unwrap();
883
884            let result = result.as_primitive::<Float64Type>();
885            assert_eq!(1, result.len());
886            let actual = result.is_valid(0).then(|| result.value(0));
887            assert_eq!(actual, expect);
888        }
889    }
890
891    #[test]
892    fn test_json_get_bool() {
893        let json_get_bool = JsonGetBool::default();
894
895        assert_eq!("json_get_bool", json_get_bool.name());
896        assert_eq!(
897            DataType::Boolean,
898            json_get_bool
899                .return_type(&[DataType::Binary, DataType::Utf8])
900                .unwrap()
901        );
902
903        let json_strings = [
904            r#"{"a": {"b": true}, "b": false, "c": true}"#,
905            r#"{"a": false, "b": {"c": true}, "c": false}"#,
906            r#"{"a": true, "b": false, "c": {"a": true}}"#,
907        ];
908        let json_struct = test_json_struct();
909
910        let path_expects = vec![
911            ("$.a.b", Some(true)),
912            ("$.a", Some(false)),
913            ("$.c", None),
914            ("$.kind", None),
915            ("$.payload.code", None),
916            ("$.payload.success", Some(false)),
917            ("$.payload.result.time_cost", None),
918            ("$.payload.not-exists", None),
919            ("$.not-exists", None),
920            ("$", None),
921        ];
922
923        let mut jsons = json_strings
924            .iter()
925            .map(|s| {
926                let value = jsonb::parse_value(s.as_bytes()).unwrap();
927                Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
928            })
929            .collect::<Vec<_>>();
930        let json_struct_arrays =
931            std::iter::repeat_n(json_struct, path_expects.len() - jsons.len()).collect::<Vec<_>>();
932        jsons.extend(json_struct_arrays);
933
934        for i in 0..jsons.len() {
935            let json = &jsons[i];
936            let (path, expect) = path_expects[i];
937
938            let args = ScalarFunctionArgs {
939                args: vec![
940                    ColumnarValue::Array(json.clone()),
941                    ColumnarValue::Scalar(path.into()),
942                ],
943                arg_fields: vec![],
944                number_rows: 1,
945                return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
946                config_options: Arc::new(Default::default()),
947            };
948            let result = json_get_bool
949                .invoke_with_args(args)
950                .and_then(|x| x.to_array(1))
951                .unwrap();
952
953            let result = result.as_boolean();
954            assert_eq!(1, result.len());
955            let actual = result.is_valid(0).then(|| result.value(0));
956            assert_eq!(actual, expect);
957        }
958    }
959
960    #[test]
961    fn test_json_get_string() {
962        let json_get_string = JsonGetString::default();
963
964        assert_eq!("json_get_string", json_get_string.name());
965        assert_eq!(
966            DataType::Utf8View,
967            json_get_string
968                .return_type(&[DataType::Binary, DataType::Utf8])
969                .unwrap()
970        );
971
972        let json_strings = [
973            r#"{"a": {"b": "a"}, "b": "b", "c": "c"}"#,
974            r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
975            r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#,
976        ];
977        let json_struct = test_json_struct();
978
979        let paths = vec![
980            "$.a.b",
981            "$.a",
982            "",
983            "$.kind",
984            "$.payload.code",
985            "$.payload.result.time_cost",
986            "$.payload",
987            "$.payload.success",
988            "$.payload.result",
989            "$.payload.result.error",
990            "$.payload.result.not-exists",
991            "$.payload.not-exists",
992            "$.not-exists",
993            "$",
994        ];
995        let expects = [
996            Some("a"),
997            Some("d"),
998            None,
999            Some("foo"),
1000            Some("404"),
1001            Some("1.234"),
1002            Some(
1003                r#"{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}"#,
1004            ),
1005            Some("false"),
1006            Some(r#"{"error":"not found","time_cost":1.234}"#),
1007            Some("not found"),
1008            None,
1009            None,
1010            None,
1011            Some(
1012                r#"{"kind":"foo","payload":{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}}"#,
1013            ),
1014        ];
1015
1016        let mut jsons = json_strings
1017            .iter()
1018            .map(|s| {
1019                let value = jsonb::parse_value(s.as_bytes()).unwrap();
1020                Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
1021            })
1022            .collect::<Vec<_>>();
1023        let json_struct_arrays =
1024            std::iter::repeat_n(json_struct, expects.len() - jsons.len()).collect::<Vec<_>>();
1025        jsons.extend(json_struct_arrays);
1026
1027        for i in 0..jsons.len() {
1028            let json = &jsons[i];
1029            let path = paths[i];
1030            let expect = expects[i];
1031
1032            let args = ScalarFunctionArgs {
1033                args: vec![
1034                    ColumnarValue::Array(json.clone()),
1035                    ColumnarValue::Scalar(path.into()),
1036                ],
1037                arg_fields: vec![],
1038                number_rows: 1,
1039                return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
1040                config_options: Arc::new(Default::default()),
1041            };
1042            let result = json_get_string
1043                .invoke_with_args(args)
1044                .and_then(|x| x.to_array(1))
1045                .unwrap();
1046
1047            let result = result.as_string_view();
1048            assert_eq!(1, result.len());
1049            let actual = result.is_valid(0).then(|| result.value(0));
1050            assert_eq!(actual, expect);
1051        }
1052    }
1053
1054    #[test]
1055    fn test_json_get_object() -> Result<()> {
1056        let udf = JsonGetObject::default();
1057        assert_eq!("json_get_object", udf.name());
1058        assert_eq!(
1059            DataType::BinaryView,
1060            udf.return_type(&[DataType::BinaryView, DataType::Utf8View])?
1061        );
1062
1063        let json_value = parse_string_to_jsonb(r#"{"a": {"b": {"c": {"d": 1}}}}"#).unwrap();
1064        let paths = vec!["$", "$.a", "$.a.b", "$.a.b.c", "$.a.b.c.d", "$.e", "$.a.e"];
1065        let number_rows = paths.len();
1066
1067        let args = ScalarFunctionArgs {
1068            args: vec![
1069                ColumnarValue::Scalar(ScalarValue::Binary(Some(json_value))),
1070                ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
1071            ],
1072            arg_fields: vec![],
1073            number_rows,
1074            return_field: Arc::new(Field::new("x", DataType::Binary, false)),
1075            config_options: Arc::new(Default::default()),
1076        };
1077        let result = udf
1078            .invoke_with_args(args)
1079            .and_then(|x| x.to_array(number_rows))?;
1080        let result = result.as_binary_view();
1081
1082        let expected = &BinaryViewArray::from_iter(
1083            vec![
1084                Some(r#"{"a": {"b": {"c": {"d": 1}}}}"#),
1085                Some(r#"{"b": {"c": {"d": 1}}}"#),
1086                Some(r#"{"c": {"d": 1}}"#),
1087                Some(r#"{"d": 1}"#),
1088                None,
1089                None,
1090                None,
1091            ]
1092            .into_iter()
1093            .map(|x| x.and_then(|s| parse_string_to_jsonb(s).ok())),
1094        );
1095        assert_eq!(result, expected);
1096        Ok(())
1097    }
1098
1099    #[test]
1100    fn test_json_get_with_type() {
1101        let json_get_with_type = JsonGetWithType::default();
1102
1103        assert_eq!("json_get", json_get_with_type.name());
1104
1105        let json_strings = [
1106            r#"{"a": {"b": "a"}, "b": "b", "c": "c"}"#,
1107            r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
1108            r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#,
1109        ];
1110        let json_struct = test_json_struct();
1111
1112        let paths = vec![
1113            "$.a.b",
1114            "$.a",
1115            "",
1116            "$.kind",
1117            "$.payload.code",
1118            "$.payload.result.time_cost",
1119            "$.payload",
1120            "$.payload.success",
1121            "$.payload.result",
1122            "$.payload.result.error",
1123            "$.payload.result.not-exists",
1124            "$.payload.not-exists",
1125            "$.not-exists",
1126            "$",
1127        ];
1128        let expects = [
1129            Some("a"),
1130            Some("d"),
1131            None,
1132            Some("foo"),
1133            Some("404"),
1134            Some("1.234"),
1135            Some(
1136                r#"{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}"#,
1137            ),
1138            Some("false"),
1139            Some(r#"{"error":"not found","time_cost":1.234}"#),
1140            Some("not found"),
1141            None,
1142            None,
1143            None,
1144            Some(
1145                r#"{"kind":"foo","payload":{"code":404,"result":{"error":"not found","time_cost":1.234},"success":false}}"#,
1146            ),
1147        ];
1148
1149        let mut jsons = json_strings
1150            .iter()
1151            .map(|s| {
1152                let value = jsonb::parse_value(s.as_bytes()).unwrap();
1153                Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
1154            })
1155            .collect::<Vec<_>>();
1156        let json_struct_arrays =
1157            std::iter::repeat_n(json_struct, expects.len() - jsons.len()).collect::<Vec<_>>();
1158        jsons.extend(json_struct_arrays);
1159
1160        for i in 0..jsons.len() {
1161            let json = &jsons[i];
1162            let path = paths[i];
1163            let expect = expects[i];
1164
1165            let args = ScalarFunctionArgs {
1166                args: vec![
1167                    ColumnarValue::Array(json.clone()),
1168                    ColumnarValue::Scalar(path.into()),
1169                    ColumnarValue::Scalar(ScalarValue::Utf8(Some("string".to_string()))),
1170                ],
1171                arg_fields: vec![],
1172                number_rows: 1,
1173                return_field: Arc::new(Field::new("x", DataType::Utf8View, false)),
1174                config_options: Arc::new(Default::default()),
1175            };
1176            let result = json_get_with_type
1177                .invoke_with_args(args)
1178                .and_then(|x| x.to_array(1))
1179                .unwrap();
1180
1181            let result = result.as_string_view();
1182            assert_eq!(1, result.len());
1183            let actual = result.is_valid(0).then(|| result.value(0));
1184            assert_eq!(actual, expect);
1185        }
1186
1187        let json_strings = [
1188            r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
1189            r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
1190            r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
1191        ];
1192        let paths = ["$.a.b", "$.a", "$.c", "$.payload.code"];
1193        let expects = [Some(2), Some(4), None, Some(404)];
1194
1195        for (i, (path, expect)) in paths.iter().zip(expects.iter()).enumerate() {
1196            let json = if i < json_strings.len() {
1197                let value = jsonb::parse_value(json_strings[i].as_bytes()).unwrap();
1198                Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
1199            } else {
1200                test_json_struct()
1201            };
1202
1203            let args = ScalarFunctionArgs {
1204                args: vec![
1205                    ColumnarValue::Array(json),
1206                    ColumnarValue::Scalar((*path).into()),
1207                    ColumnarValue::Scalar(ScalarValue::Utf8(Some("int".to_string()))),
1208                ],
1209                arg_fields: vec![],
1210                number_rows: 1,
1211                return_field: Arc::new(Field::new("x", DataType::Int64, false)),
1212                config_options: Arc::new(Default::default()),
1213            };
1214            let result = json_get_with_type
1215                .invoke_with_args(args)
1216                .and_then(|x| x.to_array(1))
1217                .unwrap();
1218
1219            let result = result.as_primitive::<Int64Type>();
1220            assert_eq!(1, result.len());
1221            let actual = result.is_valid(0).then(|| result.value(0));
1222            assert_eq!(actual, *expect);
1223        }
1224
1225        let json_strings = [
1226            r#"{"a": {"b": 2.1}, "b": 2.2, "c": 3.3}"#,
1227            r#"{"a": 4.4, "b": {"c": 6.6}, "c": 6.6}"#,
1228            r#"{"a": 7.7, "b": 8.8, "c": {"a": 7.7}}"#,
1229        ];
1230        let paths = ["$.a.b", "$.a", "$.c", "$.payload.result.time_cost"];
1231        let expects = [Some(2.1), Some(4.4), None, Some(1.234)];
1232
1233        for (i, (path, expect)) in paths.iter().zip(expects.iter()).enumerate() {
1234            let json = if i < json_strings.len() {
1235                let value = jsonb::parse_value(json_strings[i].as_bytes()).unwrap();
1236                Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
1237            } else {
1238                test_json_struct()
1239            };
1240
1241            let args = ScalarFunctionArgs {
1242                args: vec![
1243                    ColumnarValue::Array(json),
1244                    ColumnarValue::Scalar((*path).into()),
1245                    ColumnarValue::Scalar(ScalarValue::Utf8(Some("float".to_string()))),
1246                ],
1247                arg_fields: vec![],
1248                number_rows: 1,
1249                return_field: Arc::new(Field::new("x", DataType::Float64, false)),
1250                config_options: Arc::new(Default::default()),
1251            };
1252            let result = json_get_with_type
1253                .invoke_with_args(args)
1254                .and_then(|x| x.to_array(1))
1255                .unwrap();
1256
1257            let result = result.as_primitive::<Float64Type>();
1258            assert_eq!(1, result.len());
1259            let actual = result.is_valid(0).then(|| result.value(0));
1260            assert_eq!(actual, *expect);
1261        }
1262
1263        let json_strings = [
1264            r#"{"a": {"b": true}, "b": false, "c": true}"#,
1265            r#"{"a": false, "b": {"c": true}, "c": false}"#,
1266            r#"{"a": true, "b": false, "c": {"a": true}}"#,
1267        ];
1268        let paths = ["$.a.b", "$.a", "$.c", "$.payload.success"];
1269        let expects = [Some(true), Some(false), None, Some(false)];
1270
1271        for (i, (path, expect)) in paths.iter().zip(expects.iter()).enumerate() {
1272            let json = if i < json_strings.len() {
1273                let value = jsonb::parse_value(json_strings[i].as_bytes()).unwrap();
1274                Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
1275            } else {
1276                test_json_struct()
1277            };
1278
1279            let args = ScalarFunctionArgs {
1280                args: vec![
1281                    ColumnarValue::Array(json),
1282                    ColumnarValue::Scalar((*path).into()),
1283                    ColumnarValue::Scalar(ScalarValue::Utf8(Some("bool".to_string()))),
1284                ],
1285                arg_fields: vec![],
1286                number_rows: 1,
1287                return_field: Arc::new(Field::new("x", DataType::Boolean, false)),
1288                config_options: Arc::new(Default::default()),
1289            };
1290            let result = json_get_with_type
1291                .invoke_with_args(args)
1292                .and_then(|x| x.to_array(1))
1293                .unwrap();
1294
1295            let result = result.as_boolean();
1296            assert_eq!(1, result.len());
1297            let actual = result.is_valid(0).then(|| result.value(0));
1298            assert_eq!(actual, *expect);
1299        }
1300    }
1301}