1use std::cmp::Ordering;
16use std::sync::Arc;
17
18use arrow::compute;
19use arrow::util::display::{ArrayFormatter, FormatOptions};
20use arrow_array::builder::{
21 ArrayBuilder, BooleanBuilder, Float64Builder, Int64Builder, NullBuilder, StringViewBuilder,
22 make_builder,
23};
24use arrow_array::cast::AsArray;
25use arrow_array::types::{Float64Type, Int64Type, UInt64Type};
26use arrow_array::{Array, ArrayRef, GenericListArray, ListArray, StructArray, new_null_array};
27use arrow_schema::{DataType, FieldRef};
28use serde_json::Value;
29use snafu::{OptionExt, ResultExt};
30
31use crate::arrow_array::{
32 MutableBinaryArray, StringViewArray, binary_array_value, string_array_value,
33};
34use crate::error::{
35 AlignJsonArraySnafu, ArrowComputeSnafu, CastTypeSnafu, DeserializeSnafu, InvalidJsonSnafu,
36 Result, SerializeSnafu,
37};
38
39pub struct JsonArray<'a> {
40 inner: &'a ArrayRef,
41}
42
43impl JsonArray<'_> {
44 pub fn try_get_value(&self, i: usize) -> Result<Value> {
46 let array = self.inner;
47 if array.is_null(i) {
48 return Ok(Value::Null);
49 }
50
51 let value = match array.data_type() {
52 DataType::Null => Value::Null,
53 DataType::Boolean => Value::Bool(array.as_boolean().value(i)),
54 DataType::Int64 => Value::from(array.as_primitive::<Int64Type>().value(i)),
55 DataType::UInt64 => Value::from(array.as_primitive::<UInt64Type>().value(i)),
56 DataType::Float64 => Value::from(array.as_primitive::<Float64Type>().value(i)),
57 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
58 Value::String(string_array_value(array, i).to_string())
59 }
60 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
61 let bytes = binary_array_value(array, i);
62 serde_json::from_slice(bytes).with_context(|_| DeserializeSnafu {
63 json: String::from_utf8_lossy(bytes),
64 })?
65 }
66 DataType::Struct(_) => {
67 let structs = array.as_struct();
68 let object = structs
69 .fields()
70 .iter()
71 .zip(structs.columns())
72 .map(|(field, column)| {
73 JsonArray::from(column)
74 .try_get_value(i)
75 .map(|v| (field.name().clone(), v))
76 })
77 .collect::<Result<_>>()?;
78 Value::Object(object)
79 }
80 DataType::List(_) => {
81 let lists = array.as_list::<i32>();
82 let list = lists.value(i);
83 let list = JsonArray::from(&list);
84 let mut values = Vec::with_capacity(list.inner.len());
85 for i in 0..list.inner.len() {
86 values.push(list.try_get_value(i)?);
87 }
88 Value::Array(values)
89 }
90 t => {
91 return InvalidJsonSnafu {
92 value: format!("unknown JSON type {t}"),
93 }
94 .fail();
95 }
96 };
97 Ok(value)
98 }
99
100 pub fn try_align(&self, expect: &DataType) -> Result<ArrayRef> {
107 if self.inner.data_type() == expect {
108 return Ok(self.inner.clone());
109 }
110
111 common_telemetry::trace!(
112 "Try aligning JSON array {} to data type {}",
113 self.inner.data_type(),
114 expect
115 );
116
117 let struct_array = self.inner.as_struct_opt().context(AlignJsonArraySnafu {
118 reason: "expect struct array",
119 })?;
120 let array_fields = struct_array.fields();
121 let array_columns = struct_array.columns();
122 let DataType::Struct(expect_fields) = expect else {
123 return AlignJsonArraySnafu {
124 reason: "expect struct datatype",
125 }
126 .fail();
127 };
128 let mut aligned = Vec::with_capacity(expect_fields.len());
129
130 debug_assert!(expect_fields.iter().map(|f| f.name()).is_sorted());
135 debug_assert!(array_fields.iter().map(|f| f.name()).is_sorted());
136
137 let mut i = 0; let mut j = 0; while i < expect_fields.len() && j < array_fields.len() {
140 let expect_field = &expect_fields[i];
141 let array_field = &array_fields[j];
142 match expect_field.name().cmp(array_field.name()) {
143 Ordering::Equal => {
144 if expect_field.data_type() == array_field.data_type() {
145 aligned.push(array_columns[j].clone());
146 } else {
147 let expect_type = expect_field.data_type();
148 let array_type = array_field.data_type();
149 let array = match (expect_type, array_type) {
150 (DataType::Struct(_), DataType::Struct(_)) => {
151 JsonArray::from(&array_columns[j]).try_align(expect_type)?
152 }
153 (DataType::List(expect_item), DataType::List(array_item)) => {
154 let list_array = array_columns[j].as_list::<i32>();
155 try_align_list(list_array, expect_item, array_item)?
156 }
157 _ => JsonArray::from(&array_columns[j]).try_cast(expect_type)?,
158 };
159 aligned.push(array);
160 }
161 i += 1;
162 j += 1;
163 }
164 Ordering::Less => {
165 aligned.push(new_null_array(expect_field.data_type(), struct_array.len()));
166 i += 1;
167 }
168 Ordering::Greater => {
169 j += 1;
170 }
171 }
172 }
173 if i < expect_fields.len() {
174 for field in &expect_fields[i..] {
175 aligned.push(new_null_array(field.data_type(), struct_array.len()));
176 }
177 }
178
179 let json_array = StructArray::try_new(
180 expect_fields.clone(),
181 aligned,
182 struct_array.nulls().cloned(),
183 )
184 .map_err(|e| {
185 AlignJsonArraySnafu {
186 reason: e.to_string(),
187 }
188 .build()
189 })?;
190 Ok(Arc::new(json_array))
191 }
192
193 fn try_cast(&self, to_type: &DataType) -> Result<ArrayRef> {
194 let from_type = self.inner.data_type();
195 if from_type == to_type {
196 return Ok(self.inner.clone());
197 }
198
199 if from_type.is_binary() && !to_type.is_binary() {
200 return self.decode_variant(to_type);
201 }
202
203 if !from_type.is_binary() && to_type.is_binary() {
204 return self.encode_variant();
205 }
206
207 if compute::can_cast_types(from_type, to_type) {
208 return compute::cast(&self.inner, to_type).context(ArrowComputeSnafu);
209 }
210
211 let formatter = ArrayFormatter::try_new(&self.inner, &FormatOptions::default())
212 .context(ArrowComputeSnafu)?;
213 let values = (0..self.inner.len())
214 .map(|i| {
215 self.inner
216 .is_valid(i)
217 .then(|| formatter.value(i).to_string())
218 })
219 .collect::<Vec<_>>();
220 Ok(Arc::new(StringViewArray::from(values)))
221 }
222
223 fn encode_variant(&self) -> Result<ArrayRef> {
224 let len = self.inner.len();
225 let mut encoded = Vec::with_capacity(len);
226 let mut total_bytes = 0;
227
228 for i in 0..len {
229 let value = self.try_get_value(i)?;
230 if value.is_null() {
231 encoded.push(None);
232 } else {
233 let bytes = serde_json::to_vec(&value).context(SerializeSnafu)?;
234 total_bytes += bytes.len();
235 encoded.push(Some(bytes));
236 }
237 }
238
239 let mut builder = MutableBinaryArray::with_capacity(len, total_bytes);
240 for value in encoded {
241 builder.append_option(value);
242 }
243 Ok(Arc::new(builder.finish()))
244 }
245
246 fn decode_variant(&self, to_type: &DataType) -> Result<ArrayRef> {
247 fn downcast_builder<'a, T: ArrayBuilder>(
248 builder: &'a mut dyn ArrayBuilder,
249 to_type: &DataType,
250 ) -> Result<&'a mut T> {
251 builder
252 .as_any_mut()
253 .downcast_mut::<T>()
254 .with_context(|| CastTypeSnafu {
255 msg: format!("Expect ArrayBuilder is of type {to_type}"),
256 })
257 }
258
259 let mut builder = make_builder(to_type, self.inner.len());
260 if to_type.is_null() {
261 downcast_builder::<NullBuilder>(builder.as_mut(), to_type)?
262 .append_nulls(self.inner.len());
263 } else {
264 match to_type {
265 DataType::Boolean => {
266 let b = downcast_builder::<BooleanBuilder>(builder.as_mut(), to_type)?;
267 for i in 0..self.inner.len() {
268 b.append_option(self.try_get_value(i)?.as_bool());
269 }
270 }
271 DataType::Int64 => {
272 let b = downcast_builder::<Int64Builder>(builder.as_mut(), to_type)?;
273 for i in 0..self.inner.len() {
274 b.append_option(self.try_get_value(i)?.as_i64());
275 }
276 }
277 DataType::Float64 => {
278 let b = downcast_builder::<Float64Builder>(builder.as_mut(), to_type)?;
279 for i in 0..self.inner.len() {
280 b.append_option(self.try_get_value(i)?.as_f64());
281 }
282 }
283 DataType::Utf8View => {
284 let b = downcast_builder::<StringViewBuilder>(builder.as_mut(), to_type)?;
285 for i in 0..self.inner.len() {
286 let v = self.try_get_value(i)?;
287 if v.is_null() {
288 b.append_null();
289 } else if let Some(s) = v.as_str() {
290 b.append_value(s);
291 } else {
292 b.append_value(v.to_string());
293 }
294 }
295 }
296 _ => {
297 return CastTypeSnafu {
298 msg: format!("Cannot cast JSON value to {to_type}"),
299 }
300 .fail();
301 }
302 }
303 }
304 Ok(builder.finish())
305 }
306}
307
308fn try_align_list(
309 list_array: &ListArray,
310 expect_item: &FieldRef,
311 array_item: &FieldRef,
312) -> Result<ArrayRef> {
313 let item_aligned = match (expect_item.data_type(), array_item.data_type()) {
314 (DataType::Struct(_), DataType::Struct(_)) => {
315 JsonArray::from(list_array.values()).try_align(expect_item.data_type())?
316 }
317 (DataType::List(expect_item), DataType::List(array_item)) => {
318 let list_array = list_array.values().as_list::<i32>();
319 try_align_list(list_array, expect_item, array_item)?
320 }
321 _ => JsonArray::from(list_array.values()).try_cast(expect_item.data_type())?,
322 };
323 Ok(Arc::new(
324 GenericListArray::<i32>::try_new(
325 expect_item.clone(),
326 list_array.offsets().clone(),
327 item_aligned,
328 list_array.nulls().cloned(),
329 )
330 .context(ArrowComputeSnafu)?,
331 ))
332}
333
334impl<'a> From<&'a ArrayRef> for JsonArray<'a> {
335 fn from(inner: &'a ArrayRef) -> Self {
336 Self { inner }
337 }
338}
339
340#[cfg(test)]
341mod test {
342 use arrow_array::types::Int64Type;
343 use arrow_array::{
344 BinaryArray, BooleanArray, Float64Array, Int32Array, Int64Array, ListArray, StringArray,
345 };
346 use arrow_schema::{Field, Fields};
347 use serde_json::json;
348
349 use super::*;
350
351 #[test]
352 fn test_try_get_value() -> Result<()> {
353 let nulls = new_null_array(&DataType::Null, 2);
354 assert_eq!(JsonArray::from(&nulls).try_get_value(0)?, Value::Null);
355
356 let bools: ArrayRef = Arc::new(BooleanArray::from(vec![Some(true), None]));
357 assert_eq!(JsonArray::from(&bools).try_get_value(0)?, json!(true));
358 assert_eq!(JsonArray::from(&bools).try_get_value(1)?, Value::Null);
359
360 let ints: ArrayRef = Arc::new(Int64Array::from(vec![Some(-7), None]));
361 assert_eq!(JsonArray::from(&ints).try_get_value(0)?, json!(-7));
362 assert_eq!(JsonArray::from(&ints).try_get_value(1)?, Value::Null);
363
364 let floats: ArrayRef = Arc::new(Float64Array::from(vec![Some(1.5)]));
365 assert_eq!(JsonArray::from(&floats).try_get_value(0)?, json!(1.5));
366
367 let strings: ArrayRef = Arc::new(StringArray::from(vec![Some("hello"), None]));
368 assert_eq!(JsonArray::from(&strings).try_get_value(0)?, json!("hello"));
369 assert_eq!(JsonArray::from(&strings).try_get_value(1)?, Value::Null);
370
371 let binaries: ArrayRef = Arc::new(BinaryArray::from(vec![
372 br#"{"nested":[1,null,"x"]}"#.as_slice(),
373 b"null".as_slice(),
374 ]));
375 assert_eq!(
376 JsonArray::from(&binaries).try_get_value(0)?,
377 json!({"nested": [1, null, "x"]})
378 );
379 assert_eq!(JsonArray::from(&binaries).try_get_value(1)?, Value::Null);
380
381 let lists: ArrayRef = Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
382 Some(vec![Some(1), None, Some(3)]),
383 None,
384 ]));
385 assert_eq!(
386 JsonArray::from(&lists).try_get_value(0)?,
387 json!([1, null, 3])
388 );
389 assert_eq!(JsonArray::from(&lists).try_get_value(1)?, Value::Null);
390
391 let structs: ArrayRef = Arc::new(StructArray::from(vec![
392 (
393 Arc::new(Field::new("flag", DataType::Boolean, true)),
394 Arc::new(BooleanArray::from(vec![Some(true), None])) as ArrayRef,
395 ),
396 (
397 Arc::new(Field::new_list(
398 "items",
399 Field::new_list_field(DataType::Int64, true),
400 true,
401 )),
402 Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
403 Some(vec![Some(1), None]),
404 Some(vec![Some(2)]),
405 ])) as ArrayRef,
406 ),
407 ]));
408 assert_eq!(
409 JsonArray::from(&structs).try_get_value(0)?,
410 json!({"flag": true, "items": [1, null]})
411 );
412 assert_eq!(
413 JsonArray::from(&structs).try_get_value(1)?,
414 json!({"flag": null, "items": [2]})
415 );
416
417 let unsupported: ArrayRef = Arc::new(Int32Array::from(vec![1]));
418 assert_eq!(
419 JsonArray::from(&unsupported)
420 .try_get_value(0)
421 .unwrap_err()
422 .to_string(),
423 "Invalid JSON: unknown JSON type Int32"
424 );
425
426 Ok(())
427 }
428
429 #[test]
430 fn test_align_json_array() -> Result<()> {
431 struct TestCase {
432 json_array: ArrayRef,
433 schema_type: DataType,
434 expected: std::result::Result<ArrayRef, String>,
435 }
436
437 impl TestCase {
438 fn new(
439 json_array: StructArray,
440 schema_type: Fields,
441 expected: std::result::Result<Vec<ArrayRef>, String>,
442 ) -> Self {
443 Self {
444 json_array: Arc::new(json_array),
445 schema_type: DataType::Struct(schema_type.clone()),
446 expected: expected
447 .map(|x| Arc::new(StructArray::new(schema_type, x, None)) as ArrayRef),
448 }
449 }
450
451 fn test(self) -> Result<()> {
452 let result = JsonArray::from(&self.json_array).try_align(&self.schema_type);
453 match (result, self.expected) {
454 (Ok(json_array), Ok(expected)) => assert_eq!(&json_array, &expected),
455 (Ok(json_array), Err(e)) => {
456 panic!("expecting error {e} but actually get: {json_array:?}")
457 }
458 (Err(e), Err(expected)) => assert_eq!(e.to_string(), expected),
459 (Err(e), Ok(_)) => return Err(e),
460 }
461 Ok(())
462 }
463 }
464
465 TestCase::new(
467 StructArray::new_empty_fields(2, None),
468 Fields::from(vec![
469 Field::new("int", DataType::Int64, true),
470 Field::new_struct(
471 "nested",
472 vec![Field::new("bool", DataType::Boolean, true)],
473 true,
474 ),
475 Field::new("string", DataType::Utf8, true),
476 ]),
477 Ok(vec![
478 Arc::new(Int64Array::new_null(2)) as ArrayRef,
479 Arc::new(StructArray::new_null(
480 Fields::from(vec![Arc::new(Field::new("bool", DataType::Boolean, true))]),
481 2,
482 )),
483 Arc::new(StringArray::new_null(2)),
484 ]),
485 )
486 .test()?;
487
488 TestCase::new(
490 StructArray::from(vec![(
491 Arc::new(Field::new("float", DataType::Float64, true)),
492 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
493 )]),
494 Fields::from(vec![
495 Field::new("float", DataType::Float64, true),
496 Field::new("string", DataType::Utf8, true),
497 ]),
498 Ok(vec![
499 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
500 Arc::new(StringArray::new_null(3)),
501 ]),
502 )
503 .test()?;
504
505 TestCase::new(
507 StructArray::from(vec![
508 (
509 Arc::new(Field::new_list(
510 "list",
511 Field::new_list_field(DataType::Int64, true),
512 true,
513 )),
514 Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
515 Some(vec![Some(1)]),
516 None,
517 Some(vec![Some(2), Some(3)]),
518 ])) as ArrayRef,
519 ),
520 (
521 Arc::new(Field::new_struct(
522 "nested",
523 vec![Field::new("int", DataType::Int64, true)],
524 true,
525 )),
526 Arc::new(StructArray::from(vec![(
527 Arc::new(Field::new("int", DataType::Int64, true)),
528 Arc::new(Int64Array::from(vec![-1, -2, -3])) as ArrayRef,
529 )])),
530 ),
531 (
532 Arc::new(Field::new("string", DataType::Utf8, true)),
533 Arc::new(StringArray::from(vec!["a", "b", "c"])),
534 ),
535 ]),
536 Fields::from(vec![
537 Field::new("bool", DataType::Boolean, true),
538 Field::new_list("list", Field::new_list_field(DataType::Int64, true), true),
539 Field::new_struct(
540 "nested",
541 vec![
542 Field::new("float", DataType::Float64, true),
543 Field::new("int", DataType::Int64, true),
544 ],
545 true,
546 ),
547 Field::new("string", DataType::Utf8, true),
548 ]),
549 Ok(vec![
550 Arc::new(BooleanArray::new_null(3)) as ArrayRef,
551 Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
552 Some(vec![Some(1)]),
553 None,
554 Some(vec![Some(2), Some(3)]),
555 ])),
556 Arc::new(StructArray::from(vec![
557 (
558 Arc::new(Field::new("float", DataType::Float64, true)),
559 Arc::new(Float64Array::new_null(3)) as ArrayRef,
560 ),
561 (
562 Arc::new(Field::new("int", DataType::Int64, true)),
563 Arc::new(Int64Array::from(vec![-1, -2, -3])),
564 ),
565 ])),
566 Arc::new(StringArray::from(vec!["a", "b", "c"])),
567 ]),
568 )
569 .test()?;
570
571 Ok(())
572 }
573}