1use std::cmp::Ordering;
16use std::sync::Arc;
17
18use arrow::compute;
19use arrow::util::display::{ArrayFormatter, FormatOptions};
20use arrow_array::cast::AsArray;
21use arrow_array::{Array, ArrayRef, GenericListArray, ListArray, StructArray, new_null_array};
22use arrow_schema::{DataType, FieldRef};
23use snafu::{OptionExt, ResultExt, ensure};
24
25use crate::arrow_array::StringArray;
26use crate::error::{AlignJsonArraySnafu, ArrowComputeSnafu, Result};
27
28pub struct JsonArray<'a> {
29 inner: &'a ArrayRef,
30}
31
32impl JsonArray<'_> {
33 pub fn try_align(&self, expect: &DataType) -> Result<ArrayRef> {
42 if self.inner.data_type() == expect {
43 return Ok(self.inner.clone());
44 }
45
46 let struct_array = self.inner.as_struct_opt().context(AlignJsonArraySnafu {
47 reason: "expect struct array",
48 })?;
49 let array_fields = struct_array.fields();
50 let array_columns = struct_array.columns();
51 let DataType::Struct(expect_fields) = expect else {
52 return AlignJsonArraySnafu {
53 reason: "expect struct datatype",
54 }
55 .fail();
56 };
57 let mut aligned = Vec::with_capacity(expect_fields.len());
58
59 debug_assert!(expect_fields.iter().map(|f| f.name()).is_sorted());
64 debug_assert!(array_fields.iter().map(|f| f.name()).is_sorted());
65
66 let mut i = 0; let mut j = 0; while i < expect_fields.len() && j < array_fields.len() {
69 let expect_field = &expect_fields[i];
70 let array_field = &array_fields[j];
71 match expect_field.name().cmp(array_field.name()) {
72 Ordering::Equal => {
73 if expect_field.data_type() == array_field.data_type() {
74 aligned.push(array_columns[j].clone());
75 } else {
76 let expect_type = expect_field.data_type();
77 let array_type = array_field.data_type();
78 let array = match (expect_type, array_type) {
79 (DataType::Struct(_), DataType::Struct(_)) => {
80 JsonArray::from(&array_columns[j]).try_align(expect_type)?
81 }
82 (DataType::List(expect_item), DataType::List(array_item)) => {
83 let list_array = array_columns[j].as_list::<i32>();
84 try_align_list(list_array, expect_item, array_item)?
85 }
86 _ => JsonArray::from(&array_columns[j]).try_cast(expect_type)?,
87 };
88 aligned.push(array);
89 }
90 i += 1;
91 j += 1;
92 }
93 Ordering::Less => {
94 aligned.push(new_null_array(expect_field.data_type(), struct_array.len()));
95 i += 1;
96 }
97 Ordering::Greater => {
98 return AlignJsonArraySnafu {
99 reason: format!("extra fields are found: [{}]", array_field.name()),
100 }
101 .fail();
102 }
103 }
104 }
105 if i < expect_fields.len() {
106 for field in &expect_fields[i..] {
107 aligned.push(new_null_array(field.data_type(), struct_array.len()));
108 }
109 }
110 ensure!(
111 j >= array_fields.len(),
112 AlignJsonArraySnafu {
113 reason: format!(
114 "extra fields are found: [{}]",
115 array_fields[j..]
116 .iter()
117 .map(|x| x.name().as_str())
118 .collect::<Vec<_>>()
119 .join(", ")
120 ),
121 }
122 );
123
124 let json_array = StructArray::try_new(
125 expect_fields.clone(),
126 aligned,
127 struct_array.nulls().cloned(),
128 )
129 .map_err(|e| {
130 AlignJsonArraySnafu {
131 reason: e.to_string(),
132 }
133 .build()
134 })?;
135 Ok(Arc::new(json_array))
136 }
137
138 fn try_cast(&self, to_type: &DataType) -> Result<ArrayRef> {
139 if compute::can_cast_types(self.inner.data_type(), to_type) {
140 return compute::cast(&self.inner, to_type).context(ArrowComputeSnafu);
141 }
142
143 let formatter = ArrayFormatter::try_new(&self.inner, &FormatOptions::default())
145 .context(ArrowComputeSnafu)?;
146 let values = (0..self.inner.len())
147 .map(|i| {
148 self.inner
149 .is_valid(i)
150 .then(|| formatter.value(i).to_string())
151 })
152 .collect::<Vec<_>>();
153 Ok(Arc::new(StringArray::from(values)))
154 }
155}
156
157fn try_align_list(
158 list_array: &ListArray,
159 expect_item: &FieldRef,
160 array_item: &FieldRef,
161) -> Result<ArrayRef> {
162 let item_aligned = match (expect_item.data_type(), array_item.data_type()) {
163 (DataType::Struct(_), DataType::Struct(_)) => {
164 JsonArray::from(list_array.values()).try_align(expect_item.data_type())?
165 }
166 (DataType::List(expect_item), DataType::List(array_item)) => {
167 let list_array = list_array.values().as_list::<i32>();
168 try_align_list(list_array, expect_item, array_item)?
169 }
170 _ => JsonArray::from(list_array.values()).try_cast(expect_item.data_type())?,
171 };
172 Ok(Arc::new(
173 GenericListArray::<i32>::try_new(
174 expect_item.clone(),
175 list_array.offsets().clone(),
176 item_aligned,
177 list_array.nulls().cloned(),
178 )
179 .context(ArrowComputeSnafu)?,
180 ))
181}
182
183impl<'a> From<&'a ArrayRef> for JsonArray<'a> {
184 fn from(inner: &'a ArrayRef) -> Self {
185 Self { inner }
186 }
187}
188
189#[cfg(test)]
190mod test {
191 use arrow_array::types::Int64Type;
192 use arrow_array::{BooleanArray, Float64Array, Int64Array, ListArray};
193 use arrow_schema::{Field, Fields};
194
195 use super::*;
196
197 #[test]
198 fn test_align_json_array() -> Result<()> {
199 struct TestCase {
200 json_array: ArrayRef,
201 schema_type: DataType,
202 expected: std::result::Result<ArrayRef, String>,
203 }
204
205 impl TestCase {
206 fn new(
207 json_array: StructArray,
208 schema_type: Fields,
209 expected: std::result::Result<Vec<ArrayRef>, String>,
210 ) -> Self {
211 Self {
212 json_array: Arc::new(json_array),
213 schema_type: DataType::Struct(schema_type.clone()),
214 expected: expected
215 .map(|x| Arc::new(StructArray::new(schema_type, x, None)) as ArrayRef),
216 }
217 }
218
219 fn test(self) -> Result<()> {
220 let result = JsonArray::from(&self.json_array).try_align(&self.schema_type);
221 match (result, self.expected) {
222 (Ok(json_array), Ok(expected)) => assert_eq!(&json_array, &expected),
223 (Ok(json_array), Err(e)) => {
224 panic!("expecting error {e} but actually get: {json_array:?}")
225 }
226 (Err(e), Err(expected)) => assert_eq!(e.to_string(), expected),
227 (Err(e), Ok(_)) => return Err(e),
228 }
229 Ok(())
230 }
231 }
232
233 TestCase::new(
235 StructArray::new_empty_fields(2, None),
236 Fields::from(vec![
237 Field::new("int", DataType::Int64, true),
238 Field::new_struct(
239 "nested",
240 vec![Field::new("bool", DataType::Boolean, true)],
241 true,
242 ),
243 Field::new("string", DataType::Utf8, true),
244 ]),
245 Ok(vec![
246 Arc::new(Int64Array::new_null(2)) as ArrayRef,
247 Arc::new(StructArray::new_null(
248 Fields::from(vec![Arc::new(Field::new("bool", DataType::Boolean, true))]),
249 2,
250 )),
251 Arc::new(StringArray::new_null(2)),
252 ]),
253 )
254 .test()?;
255
256 TestCase::new(
258 StructArray::from(vec![(
259 Arc::new(Field::new("float", DataType::Float64, true)),
260 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
261 )]),
262 Fields::from(vec![
263 Field::new("float", DataType::Float64, true),
264 Field::new("string", DataType::Utf8, true),
265 ]),
266 Ok(vec![
267 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
268 Arc::new(StringArray::new_null(3)),
269 ]),
270 )
271 .test()?;
272
273 TestCase::new(
275 StructArray::from(vec![
276 (
277 Arc::new(Field::new_list(
278 "list",
279 Field::new_list_field(DataType::Int64, true),
280 true,
281 )),
282 Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
283 Some(vec![Some(1)]),
284 None,
285 Some(vec![Some(2), Some(3)]),
286 ])) as ArrayRef,
287 ),
288 (
289 Arc::new(Field::new_struct(
290 "nested",
291 vec![Field::new("int", DataType::Int64, true)],
292 true,
293 )),
294 Arc::new(StructArray::from(vec![(
295 Arc::new(Field::new("int", DataType::Int64, true)),
296 Arc::new(Int64Array::from(vec![-1, -2, -3])) as ArrayRef,
297 )])),
298 ),
299 (
300 Arc::new(Field::new("string", DataType::Utf8, true)),
301 Arc::new(StringArray::from(vec!["a", "b", "c"])),
302 ),
303 ]),
304 Fields::from(vec![
305 Field::new("bool", DataType::Boolean, true),
306 Field::new_list("list", Field::new_list_field(DataType::Int64, true), true),
307 Field::new_struct(
308 "nested",
309 vec![
310 Field::new("float", DataType::Float64, true),
311 Field::new("int", DataType::Int64, true),
312 ],
313 true,
314 ),
315 Field::new("string", DataType::Utf8, true),
316 ]),
317 Ok(vec![
318 Arc::new(BooleanArray::new_null(3)) as ArrayRef,
319 Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
320 Some(vec![Some(1)]),
321 None,
322 Some(vec![Some(2), Some(3)]),
323 ])),
324 Arc::new(StructArray::from(vec![
325 (
326 Arc::new(Field::new("float", DataType::Float64, true)),
327 Arc::new(Float64Array::new_null(3)) as ArrayRef,
328 ),
329 (
330 Arc::new(Field::new("int", DataType::Int64, true)),
331 Arc::new(Int64Array::from(vec![-1, -2, -3])),
332 ),
333 ])),
334 Arc::new(StringArray::from(vec!["a", "b", "c"])),
335 ]),
336 )
337 .test()?;
338
339 TestCase::new(
341 StructArray::try_from(vec![
342 ("i", Arc::new(Int64Array::from(vec![1])) as ArrayRef),
343 ("j", Arc::new(Int64Array::from(vec![2])) as ArrayRef),
344 ])
345 .unwrap(),
346 Fields::from(vec![Field::new("i", DataType::Int64, true)]),
347 Err("Failed to align JSON array, reason: extra fields are found: [j]".to_string()),
348 )
349 .test()?;
350 Ok(())
351 }
352}