Skip to main content

datatypes/vectors/json/
array.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::sync::Arc;
17
18use arrow::compute;
19use arrow::util::display::{ArrayFormatter, FormatOptions};
20use arrow_array::builder::{
21    ArrayBuilder, BooleanBuilder, Float64Builder, Int64Builder, NullBuilder, StringViewBuilder,
22    make_builder,
23};
24use arrow_array::cast::AsArray;
25use arrow_array::types::{Float64Type, Int64Type, UInt64Type};
26use arrow_array::{Array, ArrayRef, GenericListArray, ListArray, StructArray, new_null_array};
27use arrow_schema::{DataType, FieldRef};
28use serde_json::Value;
29use snafu::{OptionExt, ResultExt};
30
31use crate::arrow_array::{
32    MutableBinaryArray, StringViewArray, binary_array_value, string_array_value,
33};
34use crate::error::{
35    AlignJsonArraySnafu, ArrowComputeSnafu, CastTypeSnafu, DeserializeSnafu, InvalidJsonSnafu,
36    Result, SerializeSnafu,
37};
38
39pub struct JsonArray<'a> {
40    inner: &'a ArrayRef,
41}
42
43impl JsonArray<'_> {
44    /// Try to get the value (as a [Value]) at the index `i`.
45    pub fn try_get_value(&self, i: usize) -> Result<Value> {
46        let array = self.inner;
47        if array.is_null(i) {
48            return Ok(Value::Null);
49        }
50
51        let value = match array.data_type() {
52            DataType::Null => Value::Null,
53            DataType::Boolean => Value::Bool(array.as_boolean().value(i)),
54            DataType::Int64 => Value::from(array.as_primitive::<Int64Type>().value(i)),
55            DataType::UInt64 => Value::from(array.as_primitive::<UInt64Type>().value(i)),
56            DataType::Float64 => Value::from(array.as_primitive::<Float64Type>().value(i)),
57            DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
58                Value::String(string_array_value(array, i).to_string())
59            }
60            DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
61                let bytes = binary_array_value(array, i);
62                serde_json::from_slice(bytes).with_context(|_| DeserializeSnafu {
63                    json: String::from_utf8_lossy(bytes),
64                })?
65            }
66            DataType::Struct(_) => {
67                let structs = array.as_struct();
68                let object = structs
69                    .fields()
70                    .iter()
71                    .zip(structs.columns())
72                    .map(|(field, column)| {
73                        JsonArray::from(column)
74                            .try_get_value(i)
75                            .map(|v| (field.name().clone(), v))
76                    })
77                    .collect::<Result<_>>()?;
78                Value::Object(object)
79            }
80            DataType::List(_) => {
81                let lists = array.as_list::<i32>();
82                let list = lists.value(i);
83                let list = JsonArray::from(&list);
84                let mut values = Vec::with_capacity(list.inner.len());
85                for i in 0..list.inner.len() {
86                    values.push(list.try_get_value(i)?);
87                }
88                Value::Array(values)
89            }
90            t => {
91                return InvalidJsonSnafu {
92                    value: format!("unknown JSON type {t}"),
93                }
94                .fail();
95            }
96        };
97        Ok(value)
98    }
99
100    /// Align a JSON array to the `expect` data type. The alignment mostly does three things:
101    ///
102    /// 1. set the missing fields with null arrays;
103    /// 2. discard the fields that are not in the `expect` data type;
104    /// 3. cast the fields to the ones with same names in the `expect` if their data types are not
105    ///    matched.
106    pub fn try_align(&self, expect: &DataType) -> Result<ArrayRef> {
107        if self.inner.data_type() == expect {
108            return Ok(self.inner.clone());
109        }
110
111        common_telemetry::trace!(
112            "Try aligning JSON array {} to data type {}",
113            self.inner.data_type(),
114            expect
115        );
116
117        let struct_array = self.inner.as_struct_opt().context(AlignJsonArraySnafu {
118            reason: "expect struct array",
119        })?;
120        let array_fields = struct_array.fields();
121        let array_columns = struct_array.columns();
122        let DataType::Struct(expect_fields) = expect else {
123            return AlignJsonArraySnafu {
124                reason: "expect struct datatype",
125            }
126            .fail();
127        };
128        let mut aligned = Vec::with_capacity(expect_fields.len());
129
130        // Compare the fields in the JSON array and the to-be-aligned schema, amending with null
131        // arrays on the way. It's very important to note that fields in the JSON array and those
132        // in the JSON type are both **SORTED**, which can be guaranteed because the fields in the
133        // JSON type implementation are sorted.
134        debug_assert!(expect_fields.iter().map(|f| f.name()).is_sorted());
135        debug_assert!(array_fields.iter().map(|f| f.name()).is_sorted());
136
137        let mut i = 0; // point to the expect fields
138        let mut j = 0; // point to the array fields
139        while i < expect_fields.len() && j < array_fields.len() {
140            let expect_field = &expect_fields[i];
141            let array_field = &array_fields[j];
142            match expect_field.name().cmp(array_field.name()) {
143                Ordering::Equal => {
144                    if expect_field.data_type() == array_field.data_type() {
145                        aligned.push(array_columns[j].clone());
146                    } else {
147                        let expect_type = expect_field.data_type();
148                        let array_type = array_field.data_type();
149                        let array = match (expect_type, array_type) {
150                            (DataType::Struct(_), DataType::Struct(_)) => {
151                                JsonArray::from(&array_columns[j]).try_align(expect_type)?
152                            }
153                            (DataType::List(expect_item), DataType::List(array_item)) => {
154                                let list_array = array_columns[j].as_list::<i32>();
155                                try_align_list(list_array, expect_item, array_item)?
156                            }
157                            _ => JsonArray::from(&array_columns[j]).try_cast(expect_type)?,
158                        };
159                        aligned.push(array);
160                    }
161                    i += 1;
162                    j += 1;
163                }
164                Ordering::Less => {
165                    aligned.push(new_null_array(expect_field.data_type(), struct_array.len()));
166                    i += 1;
167                }
168                Ordering::Greater => {
169                    j += 1;
170                }
171            }
172        }
173        if i < expect_fields.len() {
174            for field in &expect_fields[i..] {
175                aligned.push(new_null_array(field.data_type(), struct_array.len()));
176            }
177        }
178
179        let json_array = StructArray::try_new(
180            expect_fields.clone(),
181            aligned,
182            struct_array.nulls().cloned(),
183        )
184        .map_err(|e| {
185            AlignJsonArraySnafu {
186                reason: e.to_string(),
187            }
188            .build()
189        })?;
190        Ok(Arc::new(json_array))
191    }
192
193    fn try_cast(&self, to_type: &DataType) -> Result<ArrayRef> {
194        let from_type = self.inner.data_type();
195        if from_type == to_type {
196            return Ok(self.inner.clone());
197        }
198
199        if from_type.is_binary() && !to_type.is_binary() {
200            return self.decode_variant(to_type);
201        }
202
203        if !from_type.is_binary() && to_type.is_binary() {
204            return self.encode_variant();
205        }
206
207        if compute::can_cast_types(from_type, to_type) {
208            return compute::cast(&self.inner, to_type).context(ArrowComputeSnafu);
209        }
210
211        let formatter = ArrayFormatter::try_new(&self.inner, &FormatOptions::default())
212            .context(ArrowComputeSnafu)?;
213        let values = (0..self.inner.len())
214            .map(|i| {
215                self.inner
216                    .is_valid(i)
217                    .then(|| formatter.value(i).to_string())
218            })
219            .collect::<Vec<_>>();
220        Ok(Arc::new(StringViewArray::from(values)))
221    }
222
223    fn encode_variant(&self) -> Result<ArrayRef> {
224        let len = self.inner.len();
225        let mut encoded = Vec::with_capacity(len);
226        let mut total_bytes = 0;
227
228        for i in 0..len {
229            let value = self.try_get_value(i)?;
230            if value.is_null() {
231                encoded.push(None);
232            } else {
233                let bytes = serde_json::to_vec(&value).context(SerializeSnafu)?;
234                total_bytes += bytes.len();
235                encoded.push(Some(bytes));
236            }
237        }
238
239        let mut builder = MutableBinaryArray::with_capacity(len, total_bytes);
240        for value in encoded {
241            builder.append_option(value);
242        }
243        Ok(Arc::new(builder.finish()))
244    }
245
246    fn decode_variant(&self, to_type: &DataType) -> Result<ArrayRef> {
247        fn downcast_builder<'a, T: ArrayBuilder>(
248            builder: &'a mut dyn ArrayBuilder,
249            to_type: &DataType,
250        ) -> Result<&'a mut T> {
251            builder
252                .as_any_mut()
253                .downcast_mut::<T>()
254                .with_context(|| CastTypeSnafu {
255                    msg: format!("Expect ArrayBuilder is of type {to_type}"),
256                })
257        }
258
259        let mut builder = make_builder(to_type, self.inner.len());
260        if to_type.is_null() {
261            downcast_builder::<NullBuilder>(builder.as_mut(), to_type)?
262                .append_nulls(self.inner.len());
263        } else {
264            match to_type {
265                DataType::Boolean => {
266                    let b = downcast_builder::<BooleanBuilder>(builder.as_mut(), to_type)?;
267                    for i in 0..self.inner.len() {
268                        b.append_option(self.try_get_value(i)?.as_bool());
269                    }
270                }
271                DataType::Int64 => {
272                    let b = downcast_builder::<Int64Builder>(builder.as_mut(), to_type)?;
273                    for i in 0..self.inner.len() {
274                        b.append_option(self.try_get_value(i)?.as_i64());
275                    }
276                }
277                DataType::Float64 => {
278                    let b = downcast_builder::<Float64Builder>(builder.as_mut(), to_type)?;
279                    for i in 0..self.inner.len() {
280                        b.append_option(self.try_get_value(i)?.as_f64());
281                    }
282                }
283                DataType::Utf8View => {
284                    let b = downcast_builder::<StringViewBuilder>(builder.as_mut(), to_type)?;
285                    for i in 0..self.inner.len() {
286                        let v = self.try_get_value(i)?;
287                        if v.is_null() {
288                            b.append_null();
289                        } else if let Some(s) = v.as_str() {
290                            b.append_value(s);
291                        } else {
292                            b.append_value(v.to_string());
293                        }
294                    }
295                }
296                _ => {
297                    return CastTypeSnafu {
298                        msg: format!("Cannot cast JSON value to {to_type}"),
299                    }
300                    .fail();
301                }
302            }
303        }
304        Ok(builder.finish())
305    }
306}
307
308fn try_align_list(
309    list_array: &ListArray,
310    expect_item: &FieldRef,
311    array_item: &FieldRef,
312) -> Result<ArrayRef> {
313    let item_aligned = match (expect_item.data_type(), array_item.data_type()) {
314        (DataType::Struct(_), DataType::Struct(_)) => {
315            JsonArray::from(list_array.values()).try_align(expect_item.data_type())?
316        }
317        (DataType::List(expect_item), DataType::List(array_item)) => {
318            let list_array = list_array.values().as_list::<i32>();
319            try_align_list(list_array, expect_item, array_item)?
320        }
321        _ => JsonArray::from(list_array.values()).try_cast(expect_item.data_type())?,
322    };
323    Ok(Arc::new(
324        GenericListArray::<i32>::try_new(
325            expect_item.clone(),
326            list_array.offsets().clone(),
327            item_aligned,
328            list_array.nulls().cloned(),
329        )
330        .context(ArrowComputeSnafu)?,
331    ))
332}
333
334impl<'a> From<&'a ArrayRef> for JsonArray<'a> {
335    fn from(inner: &'a ArrayRef) -> Self {
336        Self { inner }
337    }
338}
339
340#[cfg(test)]
341mod test {
342    use arrow_array::types::Int64Type;
343    use arrow_array::{
344        BinaryArray, BooleanArray, Float64Array, Int32Array, Int64Array, ListArray, StringArray,
345    };
346    use arrow_schema::{Field, Fields};
347    use serde_json::json;
348
349    use super::*;
350
351    #[test]
352    fn test_try_get_value() -> Result<()> {
353        let nulls = new_null_array(&DataType::Null, 2);
354        assert_eq!(JsonArray::from(&nulls).try_get_value(0)?, Value::Null);
355
356        let bools: ArrayRef = Arc::new(BooleanArray::from(vec![Some(true), None]));
357        assert_eq!(JsonArray::from(&bools).try_get_value(0)?, json!(true));
358        assert_eq!(JsonArray::from(&bools).try_get_value(1)?, Value::Null);
359
360        let ints: ArrayRef = Arc::new(Int64Array::from(vec![Some(-7), None]));
361        assert_eq!(JsonArray::from(&ints).try_get_value(0)?, json!(-7));
362        assert_eq!(JsonArray::from(&ints).try_get_value(1)?, Value::Null);
363
364        let floats: ArrayRef = Arc::new(Float64Array::from(vec![Some(1.5)]));
365        assert_eq!(JsonArray::from(&floats).try_get_value(0)?, json!(1.5));
366
367        let strings: ArrayRef = Arc::new(StringArray::from(vec![Some("hello"), None]));
368        assert_eq!(JsonArray::from(&strings).try_get_value(0)?, json!("hello"));
369        assert_eq!(JsonArray::from(&strings).try_get_value(1)?, Value::Null);
370
371        let binaries: ArrayRef = Arc::new(BinaryArray::from(vec![
372            br#"{"nested":[1,null,"x"]}"#.as_slice(),
373            b"null".as_slice(),
374        ]));
375        assert_eq!(
376            JsonArray::from(&binaries).try_get_value(0)?,
377            json!({"nested": [1, null, "x"]})
378        );
379        assert_eq!(JsonArray::from(&binaries).try_get_value(1)?, Value::Null);
380
381        let lists: ArrayRef = Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
382            Some(vec![Some(1), None, Some(3)]),
383            None,
384        ]));
385        assert_eq!(
386            JsonArray::from(&lists).try_get_value(0)?,
387            json!([1, null, 3])
388        );
389        assert_eq!(JsonArray::from(&lists).try_get_value(1)?, Value::Null);
390
391        let structs: ArrayRef = Arc::new(StructArray::from(vec![
392            (
393                Arc::new(Field::new("flag", DataType::Boolean, true)),
394                Arc::new(BooleanArray::from(vec![Some(true), None])) as ArrayRef,
395            ),
396            (
397                Arc::new(Field::new_list(
398                    "items",
399                    Field::new_list_field(DataType::Int64, true),
400                    true,
401                )),
402                Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
403                    Some(vec![Some(1), None]),
404                    Some(vec![Some(2)]),
405                ])) as ArrayRef,
406            ),
407        ]));
408        assert_eq!(
409            JsonArray::from(&structs).try_get_value(0)?,
410            json!({"flag": true, "items": [1, null]})
411        );
412        assert_eq!(
413            JsonArray::from(&structs).try_get_value(1)?,
414            json!({"flag": null, "items": [2]})
415        );
416
417        let unsupported: ArrayRef = Arc::new(Int32Array::from(vec![1]));
418        assert_eq!(
419            JsonArray::from(&unsupported)
420                .try_get_value(0)
421                .unwrap_err()
422                .to_string(),
423            "Invalid JSON: unknown JSON type Int32"
424        );
425
426        Ok(())
427    }
428
429    #[test]
430    fn test_align_json_array() -> Result<()> {
431        struct TestCase {
432            json_array: ArrayRef,
433            schema_type: DataType,
434            expected: std::result::Result<ArrayRef, String>,
435        }
436
437        impl TestCase {
438            fn new(
439                json_array: StructArray,
440                schema_type: Fields,
441                expected: std::result::Result<Vec<ArrayRef>, String>,
442            ) -> Self {
443                Self {
444                    json_array: Arc::new(json_array),
445                    schema_type: DataType::Struct(schema_type.clone()),
446                    expected: expected
447                        .map(|x| Arc::new(StructArray::new(schema_type, x, None)) as ArrayRef),
448                }
449            }
450
451            fn test(self) -> Result<()> {
452                let result = JsonArray::from(&self.json_array).try_align(&self.schema_type);
453                match (result, self.expected) {
454                    (Ok(json_array), Ok(expected)) => assert_eq!(&json_array, &expected),
455                    (Ok(json_array), Err(e)) => {
456                        panic!("expecting error {e} but actually get: {json_array:?}")
457                    }
458                    (Err(e), Err(expected)) => assert_eq!(e.to_string(), expected),
459                    (Err(e), Ok(_)) => return Err(e),
460                }
461                Ok(())
462            }
463        }
464
465        // Test empty json array can be aligned with a complex json type.
466        TestCase::new(
467            StructArray::new_empty_fields(2, None),
468            Fields::from(vec![
469                Field::new("int", DataType::Int64, true),
470                Field::new_struct(
471                    "nested",
472                    vec![Field::new("bool", DataType::Boolean, true)],
473                    true,
474                ),
475                Field::new("string", DataType::Utf8, true),
476            ]),
477            Ok(vec![
478                Arc::new(Int64Array::new_null(2)) as ArrayRef,
479                Arc::new(StructArray::new_null(
480                    Fields::from(vec![Arc::new(Field::new("bool", DataType::Boolean, true))]),
481                    2,
482                )),
483                Arc::new(StringArray::new_null(2)),
484            ]),
485        )
486        .test()?;
487
488        // Test simple json array alignment.
489        TestCase::new(
490            StructArray::from(vec![(
491                Arc::new(Field::new("float", DataType::Float64, true)),
492                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
493            )]),
494            Fields::from(vec![
495                Field::new("float", DataType::Float64, true),
496                Field::new("string", DataType::Utf8, true),
497            ]),
498            Ok(vec![
499                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
500                Arc::new(StringArray::new_null(3)),
501            ]),
502        )
503        .test()?;
504
505        // Test complex json array alignment.
506        TestCase::new(
507            StructArray::from(vec![
508                (
509                    Arc::new(Field::new_list(
510                        "list",
511                        Field::new_list_field(DataType::Int64, true),
512                        true,
513                    )),
514                    Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
515                        Some(vec![Some(1)]),
516                        None,
517                        Some(vec![Some(2), Some(3)]),
518                    ])) as ArrayRef,
519                ),
520                (
521                    Arc::new(Field::new_struct(
522                        "nested",
523                        vec![Field::new("int", DataType::Int64, true)],
524                        true,
525                    )),
526                    Arc::new(StructArray::from(vec![(
527                        Arc::new(Field::new("int", DataType::Int64, true)),
528                        Arc::new(Int64Array::from(vec![-1, -2, -3])) as ArrayRef,
529                    )])),
530                ),
531                (
532                    Arc::new(Field::new("string", DataType::Utf8, true)),
533                    Arc::new(StringArray::from(vec!["a", "b", "c"])),
534                ),
535            ]),
536            Fields::from(vec![
537                Field::new("bool", DataType::Boolean, true),
538                Field::new_list("list", Field::new_list_field(DataType::Int64, true), true),
539                Field::new_struct(
540                    "nested",
541                    vec![
542                        Field::new("float", DataType::Float64, true),
543                        Field::new("int", DataType::Int64, true),
544                    ],
545                    true,
546                ),
547                Field::new("string", DataType::Utf8, true),
548            ]),
549            Ok(vec![
550                Arc::new(BooleanArray::new_null(3)) as ArrayRef,
551                Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
552                    Some(vec![Some(1)]),
553                    None,
554                    Some(vec![Some(2), Some(3)]),
555                ])),
556                Arc::new(StructArray::from(vec![
557                    (
558                        Arc::new(Field::new("float", DataType::Float64, true)),
559                        Arc::new(Float64Array::new_null(3)) as ArrayRef,
560                    ),
561                    (
562                        Arc::new(Field::new("int", DataType::Int64, true)),
563                        Arc::new(Int64Array::from(vec![-1, -2, -3])),
564                    ),
565                ])),
566                Arc::new(StringArray::from(vec!["a", "b", "c"])),
567            ]),
568        )
569        .test()?;
570
571        Ok(())
572    }
573}