Skip to main content

datatypes/vectors/json/
array.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::sync::Arc;
17
18use arrow::compute;
19use arrow::util::display::{ArrayFormatter, FormatOptions};
20use arrow_array::builder::{
21    ArrayBuilder, BooleanBuilder, Float64Builder, Int64Builder, NullBuilder, StringViewBuilder,
22    make_builder,
23};
24use arrow_array::cast::AsArray;
25use arrow_array::types::{Float64Type, Int64Type, UInt64Type};
26use arrow_array::{Array, ArrayRef, GenericListArray, ListArray, StructArray, new_null_array};
27use arrow_schema::{DataType, FieldRef};
28use common_telemetry::debug;
29use serde_json::Value;
30use snafu::{OptionExt, ResultExt};
31
32use crate::arrow_array::{
33    MutableBinaryArray, StringViewArray, binary_array_value, string_array_value,
34};
35use crate::error::{
36    AlignJsonArraySnafu, ArrowComputeSnafu, CastTypeSnafu, DeserializeSnafu, InvalidJsonSnafu,
37    Result, SerializeSnafu,
38};
39
40pub struct JsonArray<'a> {
41    inner: &'a ArrayRef,
42}
43
44impl JsonArray<'_> {
45    /// Try to get the value (as a [Value]) at the index `i`.
46    pub fn try_get_value(&self, i: usize) -> Result<Value> {
47        let array = self.inner;
48        if array.is_null(i) {
49            return Ok(Value::Null);
50        }
51
52        let value = match array.data_type() {
53            DataType::Null => Value::Null,
54            DataType::Boolean => Value::Bool(array.as_boolean().value(i)),
55            DataType::Int64 => Value::from(array.as_primitive::<Int64Type>().value(i)),
56            DataType::UInt64 => Value::from(array.as_primitive::<UInt64Type>().value(i)),
57            DataType::Float64 => Value::from(array.as_primitive::<Float64Type>().value(i)),
58            DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
59                Value::String(string_array_value(array, i).to_string())
60            }
61            DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
62                let bytes = binary_array_value(array, i);
63                serde_json::from_slice(bytes).with_context(|_| DeserializeSnafu {
64                    json: String::from_utf8_lossy(bytes),
65                })?
66            }
67            DataType::Struct(_) => {
68                let structs = array.as_struct();
69                let object = structs
70                    .fields()
71                    .iter()
72                    .zip(structs.columns())
73                    .map(|(field, column)| {
74                        JsonArray::from(column)
75                            .try_get_value(i)
76                            .map(|v| (field.name().clone(), v))
77                    })
78                    .collect::<Result<_>>()?;
79                Value::Object(object)
80            }
81            DataType::List(_) => {
82                let lists = array.as_list::<i32>();
83                let list = lists.value(i);
84                let list = JsonArray::from(&list);
85                let mut values = Vec::with_capacity(list.inner.len());
86                for i in 0..list.inner.len() {
87                    values.push(list.try_get_value(i)?);
88                }
89                Value::Array(values)
90            }
91            t => {
92                return InvalidJsonSnafu {
93                    value: format!("unknown JSON type {t}"),
94                }
95                .fail();
96            }
97        };
98        Ok(value)
99    }
100
101    /// Align a JSON array to the `expect` data type. The alignment mostly does three things:
102    ///
103    /// 1. set the missing fields with null arrays;
104    /// 2. discard the fields that are not in the `expect` data type;
105    /// 3. cast the fields to the ones with same names in the `expect` if their data types are not
106    ///    matched.
107    pub fn try_align(&self, expect: &DataType) -> Result<ArrayRef> {
108        if self.inner.data_type() == expect {
109            return Ok(self.inner.clone());
110        }
111
112        debug!(
113            "Try aligning JSON array {} to data type {}",
114            self.inner.data_type(),
115            expect
116        );
117
118        let struct_array = self.inner.as_struct_opt().context(AlignJsonArraySnafu {
119            reason: "expect struct array",
120        })?;
121        let array_fields = struct_array.fields();
122        let array_columns = struct_array.columns();
123        let DataType::Struct(expect_fields) = expect else {
124            return AlignJsonArraySnafu {
125                reason: "expect struct datatype",
126            }
127            .fail();
128        };
129        let mut aligned = Vec::with_capacity(expect_fields.len());
130
131        // Compare the fields in the JSON array and the to-be-aligned schema, amending with null
132        // arrays on the way. It's very important to note that fields in the JSON array and those
133        // in the JSON type are both **SORTED**, which can be guaranteed because the fields in the
134        // JSON type implementation are sorted.
135        debug_assert!(expect_fields.iter().map(|f| f.name()).is_sorted());
136        debug_assert!(array_fields.iter().map(|f| f.name()).is_sorted());
137
138        let mut i = 0; // point to the expect fields
139        let mut j = 0; // point to the array fields
140        while i < expect_fields.len() && j < array_fields.len() {
141            let expect_field = &expect_fields[i];
142            let array_field = &array_fields[j];
143            match expect_field.name().cmp(array_field.name()) {
144                Ordering::Equal => {
145                    if expect_field.data_type() == array_field.data_type() {
146                        aligned.push(array_columns[j].clone());
147                    } else {
148                        let expect_type = expect_field.data_type();
149                        let array_type = array_field.data_type();
150                        let array = match (expect_type, array_type) {
151                            (DataType::Struct(_), DataType::Struct(_)) => {
152                                JsonArray::from(&array_columns[j]).try_align(expect_type)?
153                            }
154                            (DataType::List(expect_item), DataType::List(array_item)) => {
155                                let list_array = array_columns[j].as_list::<i32>();
156                                try_align_list(list_array, expect_item, array_item)?
157                            }
158                            _ => JsonArray::from(&array_columns[j]).try_cast(expect_type)?,
159                        };
160                        aligned.push(array);
161                    }
162                    i += 1;
163                    j += 1;
164                }
165                Ordering::Less => {
166                    aligned.push(new_null_array(expect_field.data_type(), struct_array.len()));
167                    i += 1;
168                }
169                Ordering::Greater => {
170                    j += 1;
171                }
172            }
173        }
174        if i < expect_fields.len() {
175            for field in &expect_fields[i..] {
176                aligned.push(new_null_array(field.data_type(), struct_array.len()));
177            }
178        }
179
180        let json_array = StructArray::try_new(
181            expect_fields.clone(),
182            aligned,
183            struct_array.nulls().cloned(),
184        )
185        .map_err(|e| {
186            AlignJsonArraySnafu {
187                reason: e.to_string(),
188            }
189            .build()
190        })?;
191        Ok(Arc::new(json_array))
192    }
193
194    fn try_cast(&self, to_type: &DataType) -> Result<ArrayRef> {
195        let from_type = self.inner.data_type();
196        if from_type == to_type {
197            return Ok(self.inner.clone());
198        }
199
200        if from_type.is_binary() && !to_type.is_binary() {
201            return self.decode_variant(to_type);
202        }
203
204        if !from_type.is_binary() && to_type.is_binary() {
205            return self.encode_variant();
206        }
207
208        if compute::can_cast_types(from_type, to_type) {
209            return compute::cast(&self.inner, to_type).context(ArrowComputeSnafu);
210        }
211
212        let formatter = ArrayFormatter::try_new(&self.inner, &FormatOptions::default())
213            .context(ArrowComputeSnafu)?;
214        let values = (0..self.inner.len())
215            .map(|i| {
216                self.inner
217                    .is_valid(i)
218                    .then(|| formatter.value(i).to_string())
219            })
220            .collect::<Vec<_>>();
221        Ok(Arc::new(StringViewArray::from(values)))
222    }
223
224    fn encode_variant(&self) -> Result<ArrayRef> {
225        let len = self.inner.len();
226        let mut encoded = Vec::with_capacity(len);
227        let mut total_bytes = 0;
228
229        for i in 0..len {
230            let value = self.try_get_value(i)?;
231            if value.is_null() {
232                encoded.push(None);
233            } else {
234                let bytes = serde_json::to_vec(&value).context(SerializeSnafu)?;
235                total_bytes += bytes.len();
236                encoded.push(Some(bytes));
237            }
238        }
239
240        let mut builder = MutableBinaryArray::with_capacity(len, total_bytes);
241        for value in encoded {
242            builder.append_option(value);
243        }
244        Ok(Arc::new(builder.finish()))
245    }
246
247    fn decode_variant(&self, to_type: &DataType) -> Result<ArrayRef> {
248        fn downcast_builder<'a, T: ArrayBuilder>(
249            builder: &'a mut dyn ArrayBuilder,
250            to_type: &DataType,
251        ) -> Result<&'a mut T> {
252            builder
253                .as_any_mut()
254                .downcast_mut::<T>()
255                .with_context(|| CastTypeSnafu {
256                    msg: format!("Expect ArrayBuilder is of type {to_type}"),
257                })
258        }
259
260        let mut builder = make_builder(to_type, self.inner.len());
261        if to_type.is_null() {
262            downcast_builder::<NullBuilder>(builder.as_mut(), to_type)?
263                .append_nulls(self.inner.len());
264        } else {
265            match to_type {
266                DataType::Boolean => {
267                    let b = downcast_builder::<BooleanBuilder>(builder.as_mut(), to_type)?;
268                    for i in 0..self.inner.len() {
269                        b.append_option(self.try_get_value(i)?.as_bool());
270                    }
271                }
272                DataType::Int64 => {
273                    let b = downcast_builder::<Int64Builder>(builder.as_mut(), to_type)?;
274                    for i in 0..self.inner.len() {
275                        b.append_option(self.try_get_value(i)?.as_i64());
276                    }
277                }
278                DataType::Float64 => {
279                    let b = downcast_builder::<Float64Builder>(builder.as_mut(), to_type)?;
280                    for i in 0..self.inner.len() {
281                        b.append_option(self.try_get_value(i)?.as_f64());
282                    }
283                }
284                DataType::Utf8View => {
285                    let b = downcast_builder::<StringViewBuilder>(builder.as_mut(), to_type)?;
286                    for i in 0..self.inner.len() {
287                        let v = self.try_get_value(i)?;
288                        if v.is_null() {
289                            b.append_null();
290                        } else if let Some(s) = v.as_str() {
291                            b.append_value(s);
292                        } else {
293                            b.append_value(v.to_string());
294                        }
295                    }
296                }
297                _ => {
298                    return CastTypeSnafu {
299                        msg: format!("Cannot cast JSON value to {to_type}"),
300                    }
301                    .fail();
302                }
303            }
304        }
305        Ok(builder.finish())
306    }
307}
308
309fn try_align_list(
310    list_array: &ListArray,
311    expect_item: &FieldRef,
312    array_item: &FieldRef,
313) -> Result<ArrayRef> {
314    let item_aligned = match (expect_item.data_type(), array_item.data_type()) {
315        (DataType::Struct(_), DataType::Struct(_)) => {
316            JsonArray::from(list_array.values()).try_align(expect_item.data_type())?
317        }
318        (DataType::List(expect_item), DataType::List(array_item)) => {
319            let list_array = list_array.values().as_list::<i32>();
320            try_align_list(list_array, expect_item, array_item)?
321        }
322        _ => JsonArray::from(list_array.values()).try_cast(expect_item.data_type())?,
323    };
324    Ok(Arc::new(
325        GenericListArray::<i32>::try_new(
326            expect_item.clone(),
327            list_array.offsets().clone(),
328            item_aligned,
329            list_array.nulls().cloned(),
330        )
331        .context(ArrowComputeSnafu)?,
332    ))
333}
334
335impl<'a> From<&'a ArrayRef> for JsonArray<'a> {
336    fn from(inner: &'a ArrayRef) -> Self {
337        Self { inner }
338    }
339}
340
341#[cfg(test)]
342mod test {
343    use arrow_array::types::Int64Type;
344    use arrow_array::{
345        BinaryArray, BooleanArray, Float64Array, Int32Array, Int64Array, ListArray, StringArray,
346    };
347    use arrow_schema::{Field, Fields};
348    use serde_json::json;
349
350    use super::*;
351
352    #[test]
353    fn test_try_get_value() -> Result<()> {
354        let nulls = new_null_array(&DataType::Null, 2);
355        assert_eq!(JsonArray::from(&nulls).try_get_value(0)?, Value::Null);
356
357        let bools: ArrayRef = Arc::new(BooleanArray::from(vec![Some(true), None]));
358        assert_eq!(JsonArray::from(&bools).try_get_value(0)?, json!(true));
359        assert_eq!(JsonArray::from(&bools).try_get_value(1)?, Value::Null);
360
361        let ints: ArrayRef = Arc::new(Int64Array::from(vec![Some(-7), None]));
362        assert_eq!(JsonArray::from(&ints).try_get_value(0)?, json!(-7));
363        assert_eq!(JsonArray::from(&ints).try_get_value(1)?, Value::Null);
364
365        let floats: ArrayRef = Arc::new(Float64Array::from(vec![Some(1.5)]));
366        assert_eq!(JsonArray::from(&floats).try_get_value(0)?, json!(1.5));
367
368        let strings: ArrayRef = Arc::new(StringArray::from(vec![Some("hello"), None]));
369        assert_eq!(JsonArray::from(&strings).try_get_value(0)?, json!("hello"));
370        assert_eq!(JsonArray::from(&strings).try_get_value(1)?, Value::Null);
371
372        let binaries: ArrayRef = Arc::new(BinaryArray::from(vec![
373            br#"{"nested":[1,null,"x"]}"#.as_slice(),
374            b"null".as_slice(),
375        ]));
376        assert_eq!(
377            JsonArray::from(&binaries).try_get_value(0)?,
378            json!({"nested": [1, null, "x"]})
379        );
380        assert_eq!(JsonArray::from(&binaries).try_get_value(1)?, Value::Null);
381
382        let lists: ArrayRef = Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
383            Some(vec![Some(1), None, Some(3)]),
384            None,
385        ]));
386        assert_eq!(
387            JsonArray::from(&lists).try_get_value(0)?,
388            json!([1, null, 3])
389        );
390        assert_eq!(JsonArray::from(&lists).try_get_value(1)?, Value::Null);
391
392        let structs: ArrayRef = Arc::new(StructArray::from(vec![
393            (
394                Arc::new(Field::new("flag", DataType::Boolean, true)),
395                Arc::new(BooleanArray::from(vec![Some(true), None])) as ArrayRef,
396            ),
397            (
398                Arc::new(Field::new_list(
399                    "items",
400                    Field::new_list_field(DataType::Int64, true),
401                    true,
402                )),
403                Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
404                    Some(vec![Some(1), None]),
405                    Some(vec![Some(2)]),
406                ])) as ArrayRef,
407            ),
408        ]));
409        assert_eq!(
410            JsonArray::from(&structs).try_get_value(0)?,
411            json!({"flag": true, "items": [1, null]})
412        );
413        assert_eq!(
414            JsonArray::from(&structs).try_get_value(1)?,
415            json!({"flag": null, "items": [2]})
416        );
417
418        let unsupported: ArrayRef = Arc::new(Int32Array::from(vec![1]));
419        assert_eq!(
420            JsonArray::from(&unsupported)
421                .try_get_value(0)
422                .unwrap_err()
423                .to_string(),
424            "Invalid JSON: unknown JSON type Int32"
425        );
426
427        Ok(())
428    }
429
430    #[test]
431    fn test_align_json_array() -> Result<()> {
432        struct TestCase {
433            json_array: ArrayRef,
434            schema_type: DataType,
435            expected: std::result::Result<ArrayRef, String>,
436        }
437
438        impl TestCase {
439            fn new(
440                json_array: StructArray,
441                schema_type: Fields,
442                expected: std::result::Result<Vec<ArrayRef>, String>,
443            ) -> Self {
444                Self {
445                    json_array: Arc::new(json_array),
446                    schema_type: DataType::Struct(schema_type.clone()),
447                    expected: expected
448                        .map(|x| Arc::new(StructArray::new(schema_type, x, None)) as ArrayRef),
449                }
450            }
451
452            fn test(self) -> Result<()> {
453                let result = JsonArray::from(&self.json_array).try_align(&self.schema_type);
454                match (result, self.expected) {
455                    (Ok(json_array), Ok(expected)) => assert_eq!(&json_array, &expected),
456                    (Ok(json_array), Err(e)) => {
457                        panic!("expecting error {e} but actually get: {json_array:?}")
458                    }
459                    (Err(e), Err(expected)) => assert_eq!(e.to_string(), expected),
460                    (Err(e), Ok(_)) => return Err(e),
461                }
462                Ok(())
463            }
464        }
465
466        // Test empty json array can be aligned with a complex json type.
467        TestCase::new(
468            StructArray::new_empty_fields(2, None),
469            Fields::from(vec![
470                Field::new("int", DataType::Int64, true),
471                Field::new_struct(
472                    "nested",
473                    vec![Field::new("bool", DataType::Boolean, true)],
474                    true,
475                ),
476                Field::new("string", DataType::Utf8, true),
477            ]),
478            Ok(vec![
479                Arc::new(Int64Array::new_null(2)) as ArrayRef,
480                Arc::new(StructArray::new_null(
481                    Fields::from(vec![Arc::new(Field::new("bool", DataType::Boolean, true))]),
482                    2,
483                )),
484                Arc::new(StringArray::new_null(2)),
485            ]),
486        )
487        .test()?;
488
489        // Test simple json array alignment.
490        TestCase::new(
491            StructArray::from(vec![(
492                Arc::new(Field::new("float", DataType::Float64, true)),
493                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
494            )]),
495            Fields::from(vec![
496                Field::new("float", DataType::Float64, true),
497                Field::new("string", DataType::Utf8, true),
498            ]),
499            Ok(vec![
500                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
501                Arc::new(StringArray::new_null(3)),
502            ]),
503        )
504        .test()?;
505
506        // Test complex json array alignment.
507        TestCase::new(
508            StructArray::from(vec![
509                (
510                    Arc::new(Field::new_list(
511                        "list",
512                        Field::new_list_field(DataType::Int64, true),
513                        true,
514                    )),
515                    Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
516                        Some(vec![Some(1)]),
517                        None,
518                        Some(vec![Some(2), Some(3)]),
519                    ])) as ArrayRef,
520                ),
521                (
522                    Arc::new(Field::new_struct(
523                        "nested",
524                        vec![Field::new("int", DataType::Int64, true)],
525                        true,
526                    )),
527                    Arc::new(StructArray::from(vec![(
528                        Arc::new(Field::new("int", DataType::Int64, true)),
529                        Arc::new(Int64Array::from(vec![-1, -2, -3])) as ArrayRef,
530                    )])),
531                ),
532                (
533                    Arc::new(Field::new("string", DataType::Utf8, true)),
534                    Arc::new(StringArray::from(vec!["a", "b", "c"])),
535                ),
536            ]),
537            Fields::from(vec![
538                Field::new("bool", DataType::Boolean, true),
539                Field::new_list("list", Field::new_list_field(DataType::Int64, true), true),
540                Field::new_struct(
541                    "nested",
542                    vec![
543                        Field::new("float", DataType::Float64, true),
544                        Field::new("int", DataType::Int64, true),
545                    ],
546                    true,
547                ),
548                Field::new("string", DataType::Utf8, true),
549            ]),
550            Ok(vec![
551                Arc::new(BooleanArray::new_null(3)) as ArrayRef,
552                Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
553                    Some(vec![Some(1)]),
554                    None,
555                    Some(vec![Some(2), Some(3)]),
556                ])),
557                Arc::new(StructArray::from(vec![
558                    (
559                        Arc::new(Field::new("float", DataType::Float64, true)),
560                        Arc::new(Float64Array::new_null(3)) as ArrayRef,
561                    ),
562                    (
563                        Arc::new(Field::new("int", DataType::Int64, true)),
564                        Arc::new(Int64Array::from(vec![-1, -2, -3])),
565                    ),
566                ])),
567                Arc::new(StringArray::from(vec!["a", "b", "c"])),
568            ]),
569        )
570        .test()?;
571
572        Ok(())
573    }
574}