Skip to main content

datatypes/vectors/json/
array.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::sync::Arc;
17
18use arrow::compute;
19use arrow::util::display::{ArrayFormatter, FormatOptions};
20use arrow_array::cast::AsArray;
21use arrow_array::{Array, ArrayRef, GenericListArray, ListArray, StructArray, new_null_array};
22use arrow_schema::{DataType, FieldRef};
23use snafu::{OptionExt, ResultExt, ensure};
24
25use crate::arrow_array::StringArray;
26use crate::error::{AlignJsonArraySnafu, ArrowComputeSnafu, Result};
27
28pub struct JsonArray<'a> {
29    inner: &'a ArrayRef,
30}
31
32impl JsonArray<'_> {
33    /// Align a JSON array to the `expect` data type. The `expect` data type is often the "largest"
34    /// JSON type after some insertions in the table schema, while the JSON array previously
35    /// written in the SST could be lagged behind it. So it's important to "align" the JSON array by
36    /// setting the missing fields with null arrays, or casting the data.
37    ///
38    /// It's an error if the to-be-aligned array contains extra fields that are not in the `expect`
39    /// data type. Forcing to align that kind of array will result in data loss, something we
40    /// generally not wanted.
41    pub fn try_align(&self, expect: &DataType) -> Result<ArrayRef> {
42        if self.inner.data_type() == expect {
43            return Ok(self.inner.clone());
44        }
45
46        let struct_array = self.inner.as_struct_opt().context(AlignJsonArraySnafu {
47            reason: "expect struct array",
48        })?;
49        let array_fields = struct_array.fields();
50        let array_columns = struct_array.columns();
51        let DataType::Struct(expect_fields) = expect else {
52            return AlignJsonArraySnafu {
53                reason: "expect struct datatype",
54            }
55            .fail();
56        };
57        let mut aligned = Vec::with_capacity(expect_fields.len());
58
59        // Compare the fields in the JSON array and the to-be-aligned schema, amending with null
60        // arrays on the way. It's very important to note that fields in the JSON array and those
61        // in the JSON type are both **SORTED**, which can be guaranteed because the fields in the
62        // JSON type implementation are sorted.
63        debug_assert!(expect_fields.iter().map(|f| f.name()).is_sorted());
64        debug_assert!(array_fields.iter().map(|f| f.name()).is_sorted());
65
66        let mut i = 0; // point to the expect fields
67        let mut j = 0; // point to the array fields
68        while i < expect_fields.len() && j < array_fields.len() {
69            let expect_field = &expect_fields[i];
70            let array_field = &array_fields[j];
71            match expect_field.name().cmp(array_field.name()) {
72                Ordering::Equal => {
73                    if expect_field.data_type() == array_field.data_type() {
74                        aligned.push(array_columns[j].clone());
75                    } else {
76                        let expect_type = expect_field.data_type();
77                        let array_type = array_field.data_type();
78                        let array = match (expect_type, array_type) {
79                            (DataType::Struct(_), DataType::Struct(_)) => {
80                                JsonArray::from(&array_columns[j]).try_align(expect_type)?
81                            }
82                            (DataType::List(expect_item), DataType::List(array_item)) => {
83                                let list_array = array_columns[j].as_list::<i32>();
84                                try_align_list(list_array, expect_item, array_item)?
85                            }
86                            _ => JsonArray::from(&array_columns[j]).try_cast(expect_type)?,
87                        };
88                        aligned.push(array);
89                    }
90                    i += 1;
91                    j += 1;
92                }
93                Ordering::Less => {
94                    aligned.push(new_null_array(expect_field.data_type(), struct_array.len()));
95                    i += 1;
96                }
97                Ordering::Greater => {
98                    return AlignJsonArraySnafu {
99                        reason: format!("extra fields are found: [{}]", array_field.name()),
100                    }
101                    .fail();
102                }
103            }
104        }
105        if i < expect_fields.len() {
106            for field in &expect_fields[i..] {
107                aligned.push(new_null_array(field.data_type(), struct_array.len()));
108            }
109        }
110        ensure!(
111            j >= array_fields.len(),
112            AlignJsonArraySnafu {
113                reason: format!(
114                    "extra fields are found: [{}]",
115                    array_fields[j..]
116                        .iter()
117                        .map(|x| x.name().as_str())
118                        .collect::<Vec<_>>()
119                        .join(", ")
120                ),
121            }
122        );
123
124        let json_array = StructArray::try_new(
125            expect_fields.clone(),
126            aligned,
127            struct_array.nulls().cloned(),
128        )
129        .map_err(|e| {
130            AlignJsonArraySnafu {
131                reason: e.to_string(),
132            }
133            .build()
134        })?;
135        Ok(Arc::new(json_array))
136    }
137
138    fn try_cast(&self, to_type: &DataType) -> Result<ArrayRef> {
139        if compute::can_cast_types(self.inner.data_type(), to_type) {
140            return compute::cast(&self.inner, to_type).context(ArrowComputeSnafu);
141        }
142
143        // TODO(LFC): Cast according to `to_type` instead of formatting to String here.
144        let formatter = ArrayFormatter::try_new(&self.inner, &FormatOptions::default())
145            .context(ArrowComputeSnafu)?;
146        let values = (0..self.inner.len())
147            .map(|i| {
148                self.inner
149                    .is_valid(i)
150                    .then(|| formatter.value(i).to_string())
151            })
152            .collect::<Vec<_>>();
153        Ok(Arc::new(StringArray::from(values)))
154    }
155}
156
157fn try_align_list(
158    list_array: &ListArray,
159    expect_item: &FieldRef,
160    array_item: &FieldRef,
161) -> Result<ArrayRef> {
162    let item_aligned = match (expect_item.data_type(), array_item.data_type()) {
163        (DataType::Struct(_), DataType::Struct(_)) => {
164            JsonArray::from(list_array.values()).try_align(expect_item.data_type())?
165        }
166        (DataType::List(expect_item), DataType::List(array_item)) => {
167            let list_array = list_array.values().as_list::<i32>();
168            try_align_list(list_array, expect_item, array_item)?
169        }
170        _ => JsonArray::from(list_array.values()).try_cast(expect_item.data_type())?,
171    };
172    Ok(Arc::new(
173        GenericListArray::<i32>::try_new(
174            expect_item.clone(),
175            list_array.offsets().clone(),
176            item_aligned,
177            list_array.nulls().cloned(),
178        )
179        .context(ArrowComputeSnafu)?,
180    ))
181}
182
183impl<'a> From<&'a ArrayRef> for JsonArray<'a> {
184    fn from(inner: &'a ArrayRef) -> Self {
185        Self { inner }
186    }
187}
188
189#[cfg(test)]
190mod test {
191    use arrow_array::types::Int64Type;
192    use arrow_array::{BooleanArray, Float64Array, Int64Array, ListArray};
193    use arrow_schema::{Field, Fields};
194
195    use super::*;
196
197    #[test]
198    fn test_align_json_array() -> Result<()> {
199        struct TestCase {
200            json_array: ArrayRef,
201            schema_type: DataType,
202            expected: std::result::Result<ArrayRef, String>,
203        }
204
205        impl TestCase {
206            fn new(
207                json_array: StructArray,
208                schema_type: Fields,
209                expected: std::result::Result<Vec<ArrayRef>, String>,
210            ) -> Self {
211                Self {
212                    json_array: Arc::new(json_array),
213                    schema_type: DataType::Struct(schema_type.clone()),
214                    expected: expected
215                        .map(|x| Arc::new(StructArray::new(schema_type, x, None)) as ArrayRef),
216                }
217            }
218
219            fn test(self) -> Result<()> {
220                let result = JsonArray::from(&self.json_array).try_align(&self.schema_type);
221                match (result, self.expected) {
222                    (Ok(json_array), Ok(expected)) => assert_eq!(&json_array, &expected),
223                    (Ok(json_array), Err(e)) => {
224                        panic!("expecting error {e} but actually get: {json_array:?}")
225                    }
226                    (Err(e), Err(expected)) => assert_eq!(e.to_string(), expected),
227                    (Err(e), Ok(_)) => return Err(e),
228                }
229                Ok(())
230            }
231        }
232
233        // Test empty json array can be aligned with a complex json type.
234        TestCase::new(
235            StructArray::new_empty_fields(2, None),
236            Fields::from(vec![
237                Field::new("int", DataType::Int64, true),
238                Field::new_struct(
239                    "nested",
240                    vec![Field::new("bool", DataType::Boolean, true)],
241                    true,
242                ),
243                Field::new("string", DataType::Utf8, true),
244            ]),
245            Ok(vec![
246                Arc::new(Int64Array::new_null(2)) as ArrayRef,
247                Arc::new(StructArray::new_null(
248                    Fields::from(vec![Arc::new(Field::new("bool", DataType::Boolean, true))]),
249                    2,
250                )),
251                Arc::new(StringArray::new_null(2)),
252            ]),
253        )
254        .test()?;
255
256        // Test simple json array alignment.
257        TestCase::new(
258            StructArray::from(vec![(
259                Arc::new(Field::new("float", DataType::Float64, true)),
260                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
261            )]),
262            Fields::from(vec![
263                Field::new("float", DataType::Float64, true),
264                Field::new("string", DataType::Utf8, true),
265            ]),
266            Ok(vec![
267                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
268                Arc::new(StringArray::new_null(3)),
269            ]),
270        )
271        .test()?;
272
273        // Test complex json array alignment.
274        TestCase::new(
275            StructArray::from(vec![
276                (
277                    Arc::new(Field::new_list(
278                        "list",
279                        Field::new_list_field(DataType::Int64, true),
280                        true,
281                    )),
282                    Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
283                        Some(vec![Some(1)]),
284                        None,
285                        Some(vec![Some(2), Some(3)]),
286                    ])) as ArrayRef,
287                ),
288                (
289                    Arc::new(Field::new_struct(
290                        "nested",
291                        vec![Field::new("int", DataType::Int64, true)],
292                        true,
293                    )),
294                    Arc::new(StructArray::from(vec![(
295                        Arc::new(Field::new("int", DataType::Int64, true)),
296                        Arc::new(Int64Array::from(vec![-1, -2, -3])) as ArrayRef,
297                    )])),
298                ),
299                (
300                    Arc::new(Field::new("string", DataType::Utf8, true)),
301                    Arc::new(StringArray::from(vec!["a", "b", "c"])),
302                ),
303            ]),
304            Fields::from(vec![
305                Field::new("bool", DataType::Boolean, true),
306                Field::new_list("list", Field::new_list_field(DataType::Int64, true), true),
307                Field::new_struct(
308                    "nested",
309                    vec![
310                        Field::new("float", DataType::Float64, true),
311                        Field::new("int", DataType::Int64, true),
312                    ],
313                    true,
314                ),
315                Field::new("string", DataType::Utf8, true),
316            ]),
317            Ok(vec![
318                Arc::new(BooleanArray::new_null(3)) as ArrayRef,
319                Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
320                    Some(vec![Some(1)]),
321                    None,
322                    Some(vec![Some(2), Some(3)]),
323                ])),
324                Arc::new(StructArray::from(vec![
325                    (
326                        Arc::new(Field::new("float", DataType::Float64, true)),
327                        Arc::new(Float64Array::new_null(3)) as ArrayRef,
328                    ),
329                    (
330                        Arc::new(Field::new("int", DataType::Int64, true)),
331                        Arc::new(Int64Array::from(vec![-1, -2, -3])),
332                    ),
333                ])),
334                Arc::new(StringArray::from(vec!["a", "b", "c"])),
335            ]),
336        )
337        .test()?;
338
339        // Test align failed.
340        TestCase::new(
341            StructArray::try_from(vec![
342                ("i", Arc::new(Int64Array::from(vec![1])) as ArrayRef),
343                ("j", Arc::new(Int64Array::from(vec![2])) as ArrayRef),
344            ])
345            .unwrap(),
346            Fields::from(vec![Field::new("i", DataType::Int64, true)]),
347            Err("Failed to align JSON array, reason: extra fields are found: [j]".to_string()),
348        )
349        .test()?;
350        Ok(())
351    }
352}