Skip to main content

datatypes/vectors/json/
array.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::sync::Arc;
17
18use arrow::compute;
19use arrow::util::display::{ArrayFormatter, FormatOptions};
20use arrow_array::cast::AsArray;
21use arrow_array::types::{Float64Type, Int64Type, UInt64Type};
22use arrow_array::{Array, ArrayRef, GenericListArray, ListArray, StructArray, new_null_array};
23use arrow_schema::{DataType, FieldRef};
24use serde_json::Value;
25use snafu::{OptionExt, ResultExt};
26
27use crate::arrow_array::{StringArray, binary_array_value, string_array_value};
28use crate::error::{
29    AlignJsonArraySnafu, ArrowComputeSnafu, DeserializeSnafu, InvalidJsonSnafu, Result,
30};
31
32pub struct JsonArray<'a> {
33    inner: &'a ArrayRef,
34}
35
36impl JsonArray<'_> {
37    /// Try to get the value (as a [Value]) at the index `i`.
38    pub fn try_get_value(&self, i: usize) -> Result<Value> {
39        let array = self.inner;
40        if array.is_null(i) {
41            return Ok(Value::Null);
42        }
43
44        let value = match array.data_type() {
45            DataType::Null => Value::Null,
46            DataType::Boolean => Value::Bool(array.as_boolean().value(i)),
47            DataType::Int64 => Value::from(array.as_primitive::<Int64Type>().value(i)),
48            DataType::UInt64 => Value::from(array.as_primitive::<UInt64Type>().value(i)),
49            DataType::Float64 => Value::from(array.as_primitive::<Float64Type>().value(i)),
50            DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
51                Value::String(string_array_value(array, i).to_string())
52            }
53            DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
54                let bytes = binary_array_value(array, i);
55                serde_json::from_slice(bytes).with_context(|_| DeserializeSnafu {
56                    json: String::from_utf8_lossy(bytes),
57                })?
58            }
59            DataType::Struct(_) => {
60                let structs = array.as_struct();
61                let object = structs
62                    .fields()
63                    .iter()
64                    .zip(structs.columns())
65                    .map(|(field, column)| {
66                        JsonArray::from(column)
67                            .try_get_value(i)
68                            .map(|v| (field.name().clone(), v))
69                    })
70                    .collect::<Result<_>>()?;
71                Value::Object(object)
72            }
73            DataType::List(_) => {
74                let lists = array.as_list::<i32>();
75                let list = lists.value(i);
76                let list = JsonArray::from(&list);
77                let mut values = Vec::with_capacity(list.inner.len());
78                for i in 0..list.inner.len() {
79                    values.push(list.try_get_value(i)?);
80                }
81                Value::Array(values)
82            }
83            t => {
84                return InvalidJsonSnafu {
85                    value: format!("unknown JSON type {t}"),
86                }
87                .fail();
88            }
89        };
90        Ok(value)
91    }
92
93    /// Align a JSON array to the `expect` data type. The alignment mostly does three things:
94    ///
95    /// 1. set the missing fields with null arrays;
96    /// 2. discard the fields that are not in the `expect` data type;
97    /// 3. cast the fields to the ones with same names in the `expect` if their data types are not
98    ///    matched.
99    pub fn try_align(&self, expect: &DataType) -> Result<ArrayRef> {
100        if self.inner.data_type() == expect {
101            return Ok(self.inner.clone());
102        }
103
104        let struct_array = self.inner.as_struct_opt().context(AlignJsonArraySnafu {
105            reason: "expect struct array",
106        })?;
107        let array_fields = struct_array.fields();
108        let array_columns = struct_array.columns();
109        let DataType::Struct(expect_fields) = expect else {
110            return AlignJsonArraySnafu {
111                reason: "expect struct datatype",
112            }
113            .fail();
114        };
115        let mut aligned = Vec::with_capacity(expect_fields.len());
116
117        // Compare the fields in the JSON array and the to-be-aligned schema, amending with null
118        // arrays on the way. It's very important to note that fields in the JSON array and those
119        // in the JSON type are both **SORTED**, which can be guaranteed because the fields in the
120        // JSON type implementation are sorted.
121        debug_assert!(expect_fields.iter().map(|f| f.name()).is_sorted());
122        debug_assert!(array_fields.iter().map(|f| f.name()).is_sorted());
123
124        let mut i = 0; // point to the expect fields
125        let mut j = 0; // point to the array fields
126        while i < expect_fields.len() && j < array_fields.len() {
127            let expect_field = &expect_fields[i];
128            let array_field = &array_fields[j];
129            match expect_field.name().cmp(array_field.name()) {
130                Ordering::Equal => {
131                    if expect_field.data_type() == array_field.data_type() {
132                        aligned.push(array_columns[j].clone());
133                    } else {
134                        let expect_type = expect_field.data_type();
135                        let array_type = array_field.data_type();
136                        let array = match (expect_type, array_type) {
137                            (DataType::Struct(_), DataType::Struct(_)) => {
138                                JsonArray::from(&array_columns[j]).try_align(expect_type)?
139                            }
140                            (DataType::List(expect_item), DataType::List(array_item)) => {
141                                let list_array = array_columns[j].as_list::<i32>();
142                                try_align_list(list_array, expect_item, array_item)?
143                            }
144                            _ => JsonArray::from(&array_columns[j]).try_cast(expect_type)?,
145                        };
146                        aligned.push(array);
147                    }
148                    i += 1;
149                    j += 1;
150                }
151                Ordering::Less => {
152                    aligned.push(new_null_array(expect_field.data_type(), struct_array.len()));
153                    i += 1;
154                }
155                Ordering::Greater => {
156                    j += 1;
157                }
158            }
159        }
160        if i < expect_fields.len() {
161            for field in &expect_fields[i..] {
162                aligned.push(new_null_array(field.data_type(), struct_array.len()));
163            }
164        }
165
166        let json_array = StructArray::try_new(
167            expect_fields.clone(),
168            aligned,
169            struct_array.nulls().cloned(),
170        )
171        .map_err(|e| {
172            AlignJsonArraySnafu {
173                reason: e.to_string(),
174            }
175            .build()
176        })?;
177        Ok(Arc::new(json_array))
178    }
179
180    fn try_cast(&self, to_type: &DataType) -> Result<ArrayRef> {
181        if compute::can_cast_types(self.inner.data_type(), to_type) {
182            return compute::cast(&self.inner, to_type).context(ArrowComputeSnafu);
183        }
184
185        // TODO(LFC): Cast according to `to_type` instead of formatting to String here.
186        let formatter = ArrayFormatter::try_new(&self.inner, &FormatOptions::default())
187            .context(ArrowComputeSnafu)?;
188        let values = (0..self.inner.len())
189            .map(|i| {
190                self.inner
191                    .is_valid(i)
192                    .then(|| formatter.value(i).to_string())
193            })
194            .collect::<Vec<_>>();
195        Ok(Arc::new(StringArray::from(values)))
196    }
197}
198
199fn try_align_list(
200    list_array: &ListArray,
201    expect_item: &FieldRef,
202    array_item: &FieldRef,
203) -> Result<ArrayRef> {
204    let item_aligned = match (expect_item.data_type(), array_item.data_type()) {
205        (DataType::Struct(_), DataType::Struct(_)) => {
206            JsonArray::from(list_array.values()).try_align(expect_item.data_type())?
207        }
208        (DataType::List(expect_item), DataType::List(array_item)) => {
209            let list_array = list_array.values().as_list::<i32>();
210            try_align_list(list_array, expect_item, array_item)?
211        }
212        _ => JsonArray::from(list_array.values()).try_cast(expect_item.data_type())?,
213    };
214    Ok(Arc::new(
215        GenericListArray::<i32>::try_new(
216            expect_item.clone(),
217            list_array.offsets().clone(),
218            item_aligned,
219            list_array.nulls().cloned(),
220        )
221        .context(ArrowComputeSnafu)?,
222    ))
223}
224
225impl<'a> From<&'a ArrayRef> for JsonArray<'a> {
226    fn from(inner: &'a ArrayRef) -> Self {
227        Self { inner }
228    }
229}
230
231#[cfg(test)]
232mod test {
233    use arrow_array::types::Int64Type;
234    use arrow_array::{BinaryArray, BooleanArray, Float64Array, Int32Array, Int64Array, ListArray};
235    use arrow_schema::{Field, Fields};
236    use serde_json::json;
237
238    use super::*;
239
240    #[test]
241    fn test_try_get_value() -> Result<()> {
242        let nulls = new_null_array(&DataType::Null, 2);
243        assert_eq!(JsonArray::from(&nulls).try_get_value(0)?, Value::Null);
244
245        let bools: ArrayRef = Arc::new(BooleanArray::from(vec![Some(true), None]));
246        assert_eq!(JsonArray::from(&bools).try_get_value(0)?, json!(true));
247        assert_eq!(JsonArray::from(&bools).try_get_value(1)?, Value::Null);
248
249        let ints: ArrayRef = Arc::new(Int64Array::from(vec![Some(-7), None]));
250        assert_eq!(JsonArray::from(&ints).try_get_value(0)?, json!(-7));
251        assert_eq!(JsonArray::from(&ints).try_get_value(1)?, Value::Null);
252
253        let floats: ArrayRef = Arc::new(Float64Array::from(vec![Some(1.5)]));
254        assert_eq!(JsonArray::from(&floats).try_get_value(0)?, json!(1.5));
255
256        let strings: ArrayRef = Arc::new(StringArray::from(vec![Some("hello"), None]));
257        assert_eq!(JsonArray::from(&strings).try_get_value(0)?, json!("hello"));
258        assert_eq!(JsonArray::from(&strings).try_get_value(1)?, Value::Null);
259
260        let binaries: ArrayRef = Arc::new(BinaryArray::from(vec![
261            br#"{"nested":[1,null,"x"]}"#.as_slice(),
262            b"null".as_slice(),
263        ]));
264        assert_eq!(
265            JsonArray::from(&binaries).try_get_value(0)?,
266            json!({"nested": [1, null, "x"]})
267        );
268        assert_eq!(JsonArray::from(&binaries).try_get_value(1)?, Value::Null);
269
270        let lists: ArrayRef = Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
271            Some(vec![Some(1), None, Some(3)]),
272            None,
273        ]));
274        assert_eq!(
275            JsonArray::from(&lists).try_get_value(0)?,
276            json!([1, null, 3])
277        );
278        assert_eq!(JsonArray::from(&lists).try_get_value(1)?, Value::Null);
279
280        let structs: ArrayRef = Arc::new(StructArray::from(vec![
281            (
282                Arc::new(Field::new("flag", DataType::Boolean, true)),
283                Arc::new(BooleanArray::from(vec![Some(true), None])) as ArrayRef,
284            ),
285            (
286                Arc::new(Field::new_list(
287                    "items",
288                    Field::new_list_field(DataType::Int64, true),
289                    true,
290                )),
291                Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
292                    Some(vec![Some(1), None]),
293                    Some(vec![Some(2)]),
294                ])) as ArrayRef,
295            ),
296        ]));
297        assert_eq!(
298            JsonArray::from(&structs).try_get_value(0)?,
299            json!({"flag": true, "items": [1, null]})
300        );
301        assert_eq!(
302            JsonArray::from(&structs).try_get_value(1)?,
303            json!({"flag": null, "items": [2]})
304        );
305
306        let unsupported: ArrayRef = Arc::new(Int32Array::from(vec![1]));
307        assert_eq!(
308            JsonArray::from(&unsupported)
309                .try_get_value(0)
310                .unwrap_err()
311                .to_string(),
312            "Invalid JSON: unknown JSON type Int32"
313        );
314
315        Ok(())
316    }
317
318    #[test]
319    fn test_align_json_array() -> Result<()> {
320        struct TestCase {
321            json_array: ArrayRef,
322            schema_type: DataType,
323            expected: std::result::Result<ArrayRef, String>,
324        }
325
326        impl TestCase {
327            fn new(
328                json_array: StructArray,
329                schema_type: Fields,
330                expected: std::result::Result<Vec<ArrayRef>, String>,
331            ) -> Self {
332                Self {
333                    json_array: Arc::new(json_array),
334                    schema_type: DataType::Struct(schema_type.clone()),
335                    expected: expected
336                        .map(|x| Arc::new(StructArray::new(schema_type, x, None)) as ArrayRef),
337                }
338            }
339
340            fn test(self) -> Result<()> {
341                let result = JsonArray::from(&self.json_array).try_align(&self.schema_type);
342                match (result, self.expected) {
343                    (Ok(json_array), Ok(expected)) => assert_eq!(&json_array, &expected),
344                    (Ok(json_array), Err(e)) => {
345                        panic!("expecting error {e} but actually get: {json_array:?}")
346                    }
347                    (Err(e), Err(expected)) => assert_eq!(e.to_string(), expected),
348                    (Err(e), Ok(_)) => return Err(e),
349                }
350                Ok(())
351            }
352        }
353
354        // Test empty json array can be aligned with a complex json type.
355        TestCase::new(
356            StructArray::new_empty_fields(2, None),
357            Fields::from(vec![
358                Field::new("int", DataType::Int64, true),
359                Field::new_struct(
360                    "nested",
361                    vec![Field::new("bool", DataType::Boolean, true)],
362                    true,
363                ),
364                Field::new("string", DataType::Utf8, true),
365            ]),
366            Ok(vec![
367                Arc::new(Int64Array::new_null(2)) as ArrayRef,
368                Arc::new(StructArray::new_null(
369                    Fields::from(vec![Arc::new(Field::new("bool", DataType::Boolean, true))]),
370                    2,
371                )),
372                Arc::new(StringArray::new_null(2)),
373            ]),
374        )
375        .test()?;
376
377        // Test simple json array alignment.
378        TestCase::new(
379            StructArray::from(vec![(
380                Arc::new(Field::new("float", DataType::Float64, true)),
381                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
382            )]),
383            Fields::from(vec![
384                Field::new("float", DataType::Float64, true),
385                Field::new("string", DataType::Utf8, true),
386            ]),
387            Ok(vec![
388                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
389                Arc::new(StringArray::new_null(3)),
390            ]),
391        )
392        .test()?;
393
394        // Test complex json array alignment.
395        TestCase::new(
396            StructArray::from(vec![
397                (
398                    Arc::new(Field::new_list(
399                        "list",
400                        Field::new_list_field(DataType::Int64, true),
401                        true,
402                    )),
403                    Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
404                        Some(vec![Some(1)]),
405                        None,
406                        Some(vec![Some(2), Some(3)]),
407                    ])) as ArrayRef,
408                ),
409                (
410                    Arc::new(Field::new_struct(
411                        "nested",
412                        vec![Field::new("int", DataType::Int64, true)],
413                        true,
414                    )),
415                    Arc::new(StructArray::from(vec![(
416                        Arc::new(Field::new("int", DataType::Int64, true)),
417                        Arc::new(Int64Array::from(vec![-1, -2, -3])) as ArrayRef,
418                    )])),
419                ),
420                (
421                    Arc::new(Field::new("string", DataType::Utf8, true)),
422                    Arc::new(StringArray::from(vec!["a", "b", "c"])),
423                ),
424            ]),
425            Fields::from(vec![
426                Field::new("bool", DataType::Boolean, true),
427                Field::new_list("list", Field::new_list_field(DataType::Int64, true), true),
428                Field::new_struct(
429                    "nested",
430                    vec![
431                        Field::new("float", DataType::Float64, true),
432                        Field::new("int", DataType::Int64, true),
433                    ],
434                    true,
435                ),
436                Field::new("string", DataType::Utf8, true),
437            ]),
438            Ok(vec![
439                Arc::new(BooleanArray::new_null(3)) as ArrayRef,
440                Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
441                    Some(vec![Some(1)]),
442                    None,
443                    Some(vec![Some(2), Some(3)]),
444                ])),
445                Arc::new(StructArray::from(vec![
446                    (
447                        Arc::new(Field::new("float", DataType::Float64, true)),
448                        Arc::new(Float64Array::new_null(3)) as ArrayRef,
449                    ),
450                    (
451                        Arc::new(Field::new("int", DataType::Int64, true)),
452                        Arc::new(Int64Array::from(vec![-1, -2, -3])),
453                    ),
454                ])),
455                Arc::new(StringArray::from(vec!["a", "b", "c"])),
456            ]),
457        )
458        .test()?;
459
460        Ok(())
461    }
462}