1use std::cmp::Ordering;
16use std::sync::Arc;
17
18use arrow::compute;
19use arrow::util::display::{ArrayFormatter, FormatOptions};
20use arrow_array::cast::AsArray;
21use arrow_array::types::{Float64Type, Int64Type, UInt64Type};
22use arrow_array::{Array, ArrayRef, GenericListArray, ListArray, StructArray, new_null_array};
23use arrow_schema::{DataType, FieldRef};
24use serde_json::Value;
25use snafu::{OptionExt, ResultExt};
26
27use crate::arrow_array::{StringArray, binary_array_value, string_array_value};
28use crate::error::{
29 AlignJsonArraySnafu, ArrowComputeSnafu, DeserializeSnafu, InvalidJsonSnafu, Result,
30};
31
32pub struct JsonArray<'a> {
33 inner: &'a ArrayRef,
34}
35
36impl JsonArray<'_> {
37 pub fn try_get_value(&self, i: usize) -> Result<Value> {
39 let array = self.inner;
40 if array.is_null(i) {
41 return Ok(Value::Null);
42 }
43
44 let value = match array.data_type() {
45 DataType::Null => Value::Null,
46 DataType::Boolean => Value::Bool(array.as_boolean().value(i)),
47 DataType::Int64 => Value::from(array.as_primitive::<Int64Type>().value(i)),
48 DataType::UInt64 => Value::from(array.as_primitive::<UInt64Type>().value(i)),
49 DataType::Float64 => Value::from(array.as_primitive::<Float64Type>().value(i)),
50 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
51 Value::String(string_array_value(array, i).to_string())
52 }
53 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
54 let bytes = binary_array_value(array, i);
55 serde_json::from_slice(bytes).with_context(|_| DeserializeSnafu {
56 json: String::from_utf8_lossy(bytes),
57 })?
58 }
59 DataType::Struct(_) => {
60 let structs = array.as_struct();
61 let object = structs
62 .fields()
63 .iter()
64 .zip(structs.columns())
65 .map(|(field, column)| {
66 JsonArray::from(column)
67 .try_get_value(i)
68 .map(|v| (field.name().clone(), v))
69 })
70 .collect::<Result<_>>()?;
71 Value::Object(object)
72 }
73 DataType::List(_) => {
74 let lists = array.as_list::<i32>();
75 let list = lists.value(i);
76 let list = JsonArray::from(&list);
77 let mut values = Vec::with_capacity(list.inner.len());
78 for i in 0..list.inner.len() {
79 values.push(list.try_get_value(i)?);
80 }
81 Value::Array(values)
82 }
83 t => {
84 return InvalidJsonSnafu {
85 value: format!("unknown JSON type {t}"),
86 }
87 .fail();
88 }
89 };
90 Ok(value)
91 }
92
93 pub fn try_align(&self, expect: &DataType) -> Result<ArrayRef> {
100 if self.inner.data_type() == expect {
101 return Ok(self.inner.clone());
102 }
103
104 let struct_array = self.inner.as_struct_opt().context(AlignJsonArraySnafu {
105 reason: "expect struct array",
106 })?;
107 let array_fields = struct_array.fields();
108 let array_columns = struct_array.columns();
109 let DataType::Struct(expect_fields) = expect else {
110 return AlignJsonArraySnafu {
111 reason: "expect struct datatype",
112 }
113 .fail();
114 };
115 let mut aligned = Vec::with_capacity(expect_fields.len());
116
117 debug_assert!(expect_fields.iter().map(|f| f.name()).is_sorted());
122 debug_assert!(array_fields.iter().map(|f| f.name()).is_sorted());
123
124 let mut i = 0; let mut j = 0; while i < expect_fields.len() && j < array_fields.len() {
127 let expect_field = &expect_fields[i];
128 let array_field = &array_fields[j];
129 match expect_field.name().cmp(array_field.name()) {
130 Ordering::Equal => {
131 if expect_field.data_type() == array_field.data_type() {
132 aligned.push(array_columns[j].clone());
133 } else {
134 let expect_type = expect_field.data_type();
135 let array_type = array_field.data_type();
136 let array = match (expect_type, array_type) {
137 (DataType::Struct(_), DataType::Struct(_)) => {
138 JsonArray::from(&array_columns[j]).try_align(expect_type)?
139 }
140 (DataType::List(expect_item), DataType::List(array_item)) => {
141 let list_array = array_columns[j].as_list::<i32>();
142 try_align_list(list_array, expect_item, array_item)?
143 }
144 _ => JsonArray::from(&array_columns[j]).try_cast(expect_type)?,
145 };
146 aligned.push(array);
147 }
148 i += 1;
149 j += 1;
150 }
151 Ordering::Less => {
152 aligned.push(new_null_array(expect_field.data_type(), struct_array.len()));
153 i += 1;
154 }
155 Ordering::Greater => {
156 j += 1;
157 }
158 }
159 }
160 if i < expect_fields.len() {
161 for field in &expect_fields[i..] {
162 aligned.push(new_null_array(field.data_type(), struct_array.len()));
163 }
164 }
165
166 let json_array = StructArray::try_new(
167 expect_fields.clone(),
168 aligned,
169 struct_array.nulls().cloned(),
170 )
171 .map_err(|e| {
172 AlignJsonArraySnafu {
173 reason: e.to_string(),
174 }
175 .build()
176 })?;
177 Ok(Arc::new(json_array))
178 }
179
180 fn try_cast(&self, to_type: &DataType) -> Result<ArrayRef> {
181 if compute::can_cast_types(self.inner.data_type(), to_type) {
182 return compute::cast(&self.inner, to_type).context(ArrowComputeSnafu);
183 }
184
185 let formatter = ArrayFormatter::try_new(&self.inner, &FormatOptions::default())
187 .context(ArrowComputeSnafu)?;
188 let values = (0..self.inner.len())
189 .map(|i| {
190 self.inner
191 .is_valid(i)
192 .then(|| formatter.value(i).to_string())
193 })
194 .collect::<Vec<_>>();
195 Ok(Arc::new(StringArray::from(values)))
196 }
197}
198
199fn try_align_list(
200 list_array: &ListArray,
201 expect_item: &FieldRef,
202 array_item: &FieldRef,
203) -> Result<ArrayRef> {
204 let item_aligned = match (expect_item.data_type(), array_item.data_type()) {
205 (DataType::Struct(_), DataType::Struct(_)) => {
206 JsonArray::from(list_array.values()).try_align(expect_item.data_type())?
207 }
208 (DataType::List(expect_item), DataType::List(array_item)) => {
209 let list_array = list_array.values().as_list::<i32>();
210 try_align_list(list_array, expect_item, array_item)?
211 }
212 _ => JsonArray::from(list_array.values()).try_cast(expect_item.data_type())?,
213 };
214 Ok(Arc::new(
215 GenericListArray::<i32>::try_new(
216 expect_item.clone(),
217 list_array.offsets().clone(),
218 item_aligned,
219 list_array.nulls().cloned(),
220 )
221 .context(ArrowComputeSnafu)?,
222 ))
223}
224
225impl<'a> From<&'a ArrayRef> for JsonArray<'a> {
226 fn from(inner: &'a ArrayRef) -> Self {
227 Self { inner }
228 }
229}
230
231#[cfg(test)]
232mod test {
233 use arrow_array::types::Int64Type;
234 use arrow_array::{BinaryArray, BooleanArray, Float64Array, Int32Array, Int64Array, ListArray};
235 use arrow_schema::{Field, Fields};
236 use serde_json::json;
237
238 use super::*;
239
240 #[test]
241 fn test_try_get_value() -> Result<()> {
242 let nulls = new_null_array(&DataType::Null, 2);
243 assert_eq!(JsonArray::from(&nulls).try_get_value(0)?, Value::Null);
244
245 let bools: ArrayRef = Arc::new(BooleanArray::from(vec![Some(true), None]));
246 assert_eq!(JsonArray::from(&bools).try_get_value(0)?, json!(true));
247 assert_eq!(JsonArray::from(&bools).try_get_value(1)?, Value::Null);
248
249 let ints: ArrayRef = Arc::new(Int64Array::from(vec![Some(-7), None]));
250 assert_eq!(JsonArray::from(&ints).try_get_value(0)?, json!(-7));
251 assert_eq!(JsonArray::from(&ints).try_get_value(1)?, Value::Null);
252
253 let floats: ArrayRef = Arc::new(Float64Array::from(vec![Some(1.5)]));
254 assert_eq!(JsonArray::from(&floats).try_get_value(0)?, json!(1.5));
255
256 let strings: ArrayRef = Arc::new(StringArray::from(vec![Some("hello"), None]));
257 assert_eq!(JsonArray::from(&strings).try_get_value(0)?, json!("hello"));
258 assert_eq!(JsonArray::from(&strings).try_get_value(1)?, Value::Null);
259
260 let binaries: ArrayRef = Arc::new(BinaryArray::from(vec![
261 br#"{"nested":[1,null,"x"]}"#.as_slice(),
262 b"null".as_slice(),
263 ]));
264 assert_eq!(
265 JsonArray::from(&binaries).try_get_value(0)?,
266 json!({"nested": [1, null, "x"]})
267 );
268 assert_eq!(JsonArray::from(&binaries).try_get_value(1)?, Value::Null);
269
270 let lists: ArrayRef = Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
271 Some(vec![Some(1), None, Some(3)]),
272 None,
273 ]));
274 assert_eq!(
275 JsonArray::from(&lists).try_get_value(0)?,
276 json!([1, null, 3])
277 );
278 assert_eq!(JsonArray::from(&lists).try_get_value(1)?, Value::Null);
279
280 let structs: ArrayRef = Arc::new(StructArray::from(vec![
281 (
282 Arc::new(Field::new("flag", DataType::Boolean, true)),
283 Arc::new(BooleanArray::from(vec![Some(true), None])) as ArrayRef,
284 ),
285 (
286 Arc::new(Field::new_list(
287 "items",
288 Field::new_list_field(DataType::Int64, true),
289 true,
290 )),
291 Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
292 Some(vec![Some(1), None]),
293 Some(vec![Some(2)]),
294 ])) as ArrayRef,
295 ),
296 ]));
297 assert_eq!(
298 JsonArray::from(&structs).try_get_value(0)?,
299 json!({"flag": true, "items": [1, null]})
300 );
301 assert_eq!(
302 JsonArray::from(&structs).try_get_value(1)?,
303 json!({"flag": null, "items": [2]})
304 );
305
306 let unsupported: ArrayRef = Arc::new(Int32Array::from(vec![1]));
307 assert_eq!(
308 JsonArray::from(&unsupported)
309 .try_get_value(0)
310 .unwrap_err()
311 .to_string(),
312 "Invalid JSON: unknown JSON type Int32"
313 );
314
315 Ok(())
316 }
317
318 #[test]
319 fn test_align_json_array() -> Result<()> {
320 struct TestCase {
321 json_array: ArrayRef,
322 schema_type: DataType,
323 expected: std::result::Result<ArrayRef, String>,
324 }
325
326 impl TestCase {
327 fn new(
328 json_array: StructArray,
329 schema_type: Fields,
330 expected: std::result::Result<Vec<ArrayRef>, String>,
331 ) -> Self {
332 Self {
333 json_array: Arc::new(json_array),
334 schema_type: DataType::Struct(schema_type.clone()),
335 expected: expected
336 .map(|x| Arc::new(StructArray::new(schema_type, x, None)) as ArrayRef),
337 }
338 }
339
340 fn test(self) -> Result<()> {
341 let result = JsonArray::from(&self.json_array).try_align(&self.schema_type);
342 match (result, self.expected) {
343 (Ok(json_array), Ok(expected)) => assert_eq!(&json_array, &expected),
344 (Ok(json_array), Err(e)) => {
345 panic!("expecting error {e} but actually get: {json_array:?}")
346 }
347 (Err(e), Err(expected)) => assert_eq!(e.to_string(), expected),
348 (Err(e), Ok(_)) => return Err(e),
349 }
350 Ok(())
351 }
352 }
353
354 TestCase::new(
356 StructArray::new_empty_fields(2, None),
357 Fields::from(vec![
358 Field::new("int", DataType::Int64, true),
359 Field::new_struct(
360 "nested",
361 vec![Field::new("bool", DataType::Boolean, true)],
362 true,
363 ),
364 Field::new("string", DataType::Utf8, true),
365 ]),
366 Ok(vec![
367 Arc::new(Int64Array::new_null(2)) as ArrayRef,
368 Arc::new(StructArray::new_null(
369 Fields::from(vec![Arc::new(Field::new("bool", DataType::Boolean, true))]),
370 2,
371 )),
372 Arc::new(StringArray::new_null(2)),
373 ]),
374 )
375 .test()?;
376
377 TestCase::new(
379 StructArray::from(vec![(
380 Arc::new(Field::new("float", DataType::Float64, true)),
381 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
382 )]),
383 Fields::from(vec![
384 Field::new("float", DataType::Float64, true),
385 Field::new("string", DataType::Utf8, true),
386 ]),
387 Ok(vec![
388 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
389 Arc::new(StringArray::new_null(3)),
390 ]),
391 )
392 .test()?;
393
394 TestCase::new(
396 StructArray::from(vec![
397 (
398 Arc::new(Field::new_list(
399 "list",
400 Field::new_list_field(DataType::Int64, true),
401 true,
402 )),
403 Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
404 Some(vec![Some(1)]),
405 None,
406 Some(vec![Some(2), Some(3)]),
407 ])) as ArrayRef,
408 ),
409 (
410 Arc::new(Field::new_struct(
411 "nested",
412 vec![Field::new("int", DataType::Int64, true)],
413 true,
414 )),
415 Arc::new(StructArray::from(vec![(
416 Arc::new(Field::new("int", DataType::Int64, true)),
417 Arc::new(Int64Array::from(vec![-1, -2, -3])) as ArrayRef,
418 )])),
419 ),
420 (
421 Arc::new(Field::new("string", DataType::Utf8, true)),
422 Arc::new(StringArray::from(vec!["a", "b", "c"])),
423 ),
424 ]),
425 Fields::from(vec![
426 Field::new("bool", DataType::Boolean, true),
427 Field::new_list("list", Field::new_list_field(DataType::Int64, true), true),
428 Field::new_struct(
429 "nested",
430 vec![
431 Field::new("float", DataType::Float64, true),
432 Field::new("int", DataType::Int64, true),
433 ],
434 true,
435 ),
436 Field::new("string", DataType::Utf8, true),
437 ]),
438 Ok(vec![
439 Arc::new(BooleanArray::new_null(3)) as ArrayRef,
440 Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
441 Some(vec![Some(1)]),
442 None,
443 Some(vec![Some(2), Some(3)]),
444 ])),
445 Arc::new(StructArray::from(vec![
446 (
447 Arc::new(Field::new("float", DataType::Float64, true)),
448 Arc::new(Float64Array::new_null(3)) as ArrayRef,
449 ),
450 (
451 Arc::new(Field::new("int", DataType::Int64, true)),
452 Arc::new(Int64Array::from(vec![-1, -2, -3])),
453 ),
454 ])),
455 Arc::new(StringArray::from(vec!["a", "b", "c"])),
456 ]),
457 )
458 .test()?;
459
460 Ok(())
461 }
462}