1use std::cmp::Ordering;
16use std::sync::Arc;
17
18use arrow::compute;
19use arrow::util::display::{ArrayFormatter, FormatOptions};
20use arrow_array::builder::{
21 ArrayBuilder, BooleanBuilder, Float64Builder, Int64Builder, NullBuilder, StringViewBuilder,
22 make_builder,
23};
24use arrow_array::cast::AsArray;
25use arrow_array::types::{Float64Type, Int64Type, UInt64Type};
26use arrow_array::{Array, ArrayRef, GenericListArray, ListArray, StructArray, new_null_array};
27use arrow_schema::{DataType, FieldRef};
28use common_telemetry::debug;
29use serde_json::Value;
30use snafu::{OptionExt, ResultExt};
31
32use crate::arrow_array::{
33 MutableBinaryArray, StringViewArray, binary_array_value, string_array_value,
34};
35use crate::error::{
36 AlignJsonArraySnafu, ArrowComputeSnafu, CastTypeSnafu, DeserializeSnafu, InvalidJsonSnafu,
37 Result, SerializeSnafu,
38};
39
40pub struct JsonArray<'a> {
41 inner: &'a ArrayRef,
42}
43
44impl JsonArray<'_> {
45 pub fn try_get_value(&self, i: usize) -> Result<Value> {
47 let array = self.inner;
48 if array.is_null(i) {
49 return Ok(Value::Null);
50 }
51
52 let value = match array.data_type() {
53 DataType::Null => Value::Null,
54 DataType::Boolean => Value::Bool(array.as_boolean().value(i)),
55 DataType::Int64 => Value::from(array.as_primitive::<Int64Type>().value(i)),
56 DataType::UInt64 => Value::from(array.as_primitive::<UInt64Type>().value(i)),
57 DataType::Float64 => Value::from(array.as_primitive::<Float64Type>().value(i)),
58 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
59 Value::String(string_array_value(array, i).to_string())
60 }
61 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
62 let bytes = binary_array_value(array, i);
63 serde_json::from_slice(bytes).with_context(|_| DeserializeSnafu {
64 json: String::from_utf8_lossy(bytes),
65 })?
66 }
67 DataType::Struct(_) => {
68 let structs = array.as_struct();
69 let object = structs
70 .fields()
71 .iter()
72 .zip(structs.columns())
73 .map(|(field, column)| {
74 JsonArray::from(column)
75 .try_get_value(i)
76 .map(|v| (field.name().clone(), v))
77 })
78 .collect::<Result<_>>()?;
79 Value::Object(object)
80 }
81 DataType::List(_) => {
82 let lists = array.as_list::<i32>();
83 let list = lists.value(i);
84 let list = JsonArray::from(&list);
85 let mut values = Vec::with_capacity(list.inner.len());
86 for i in 0..list.inner.len() {
87 values.push(list.try_get_value(i)?);
88 }
89 Value::Array(values)
90 }
91 t => {
92 return InvalidJsonSnafu {
93 value: format!("unknown JSON type {t}"),
94 }
95 .fail();
96 }
97 };
98 Ok(value)
99 }
100
101 pub fn try_align(&self, expect: &DataType) -> Result<ArrayRef> {
108 if self.inner.data_type() == expect {
109 return Ok(self.inner.clone());
110 }
111
112 debug!(
113 "Try aligning JSON array {} to data type {}",
114 self.inner.data_type(),
115 expect
116 );
117
118 let struct_array = self.inner.as_struct_opt().context(AlignJsonArraySnafu {
119 reason: "expect struct array",
120 })?;
121 let array_fields = struct_array.fields();
122 let array_columns = struct_array.columns();
123 let DataType::Struct(expect_fields) = expect else {
124 return AlignJsonArraySnafu {
125 reason: "expect struct datatype",
126 }
127 .fail();
128 };
129 let mut aligned = Vec::with_capacity(expect_fields.len());
130
131 debug_assert!(expect_fields.iter().map(|f| f.name()).is_sorted());
136 debug_assert!(array_fields.iter().map(|f| f.name()).is_sorted());
137
138 let mut i = 0; let mut j = 0; while i < expect_fields.len() && j < array_fields.len() {
141 let expect_field = &expect_fields[i];
142 let array_field = &array_fields[j];
143 match expect_field.name().cmp(array_field.name()) {
144 Ordering::Equal => {
145 if expect_field.data_type() == array_field.data_type() {
146 aligned.push(array_columns[j].clone());
147 } else {
148 let expect_type = expect_field.data_type();
149 let array_type = array_field.data_type();
150 let array = match (expect_type, array_type) {
151 (DataType::Struct(_), DataType::Struct(_)) => {
152 JsonArray::from(&array_columns[j]).try_align(expect_type)?
153 }
154 (DataType::List(expect_item), DataType::List(array_item)) => {
155 let list_array = array_columns[j].as_list::<i32>();
156 try_align_list(list_array, expect_item, array_item)?
157 }
158 _ => JsonArray::from(&array_columns[j]).try_cast(expect_type)?,
159 };
160 aligned.push(array);
161 }
162 i += 1;
163 j += 1;
164 }
165 Ordering::Less => {
166 aligned.push(new_null_array(expect_field.data_type(), struct_array.len()));
167 i += 1;
168 }
169 Ordering::Greater => {
170 j += 1;
171 }
172 }
173 }
174 if i < expect_fields.len() {
175 for field in &expect_fields[i..] {
176 aligned.push(new_null_array(field.data_type(), struct_array.len()));
177 }
178 }
179
180 let json_array = StructArray::try_new(
181 expect_fields.clone(),
182 aligned,
183 struct_array.nulls().cloned(),
184 )
185 .map_err(|e| {
186 AlignJsonArraySnafu {
187 reason: e.to_string(),
188 }
189 .build()
190 })?;
191 Ok(Arc::new(json_array))
192 }
193
194 fn try_cast(&self, to_type: &DataType) -> Result<ArrayRef> {
195 let from_type = self.inner.data_type();
196 if from_type == to_type {
197 return Ok(self.inner.clone());
198 }
199
200 if from_type.is_binary() && !to_type.is_binary() {
201 return self.decode_variant(to_type);
202 }
203
204 if !from_type.is_binary() && to_type.is_binary() {
205 return self.encode_variant();
206 }
207
208 if compute::can_cast_types(from_type, to_type) {
209 return compute::cast(&self.inner, to_type).context(ArrowComputeSnafu);
210 }
211
212 let formatter = ArrayFormatter::try_new(&self.inner, &FormatOptions::default())
213 .context(ArrowComputeSnafu)?;
214 let values = (0..self.inner.len())
215 .map(|i| {
216 self.inner
217 .is_valid(i)
218 .then(|| formatter.value(i).to_string())
219 })
220 .collect::<Vec<_>>();
221 Ok(Arc::new(StringViewArray::from(values)))
222 }
223
224 fn encode_variant(&self) -> Result<ArrayRef> {
225 let len = self.inner.len();
226 let mut encoded = Vec::with_capacity(len);
227 let mut total_bytes = 0;
228
229 for i in 0..len {
230 let value = self.try_get_value(i)?;
231 if value.is_null() {
232 encoded.push(None);
233 } else {
234 let bytes = serde_json::to_vec(&value).context(SerializeSnafu)?;
235 total_bytes += bytes.len();
236 encoded.push(Some(bytes));
237 }
238 }
239
240 let mut builder = MutableBinaryArray::with_capacity(len, total_bytes);
241 for value in encoded {
242 builder.append_option(value);
243 }
244 Ok(Arc::new(builder.finish()))
245 }
246
247 fn decode_variant(&self, to_type: &DataType) -> Result<ArrayRef> {
248 fn downcast_builder<'a, T: ArrayBuilder>(
249 builder: &'a mut dyn ArrayBuilder,
250 to_type: &DataType,
251 ) -> Result<&'a mut T> {
252 builder
253 .as_any_mut()
254 .downcast_mut::<T>()
255 .with_context(|| CastTypeSnafu {
256 msg: format!("Expect ArrayBuilder is of type {to_type}"),
257 })
258 }
259
260 let mut builder = make_builder(to_type, self.inner.len());
261 if to_type.is_null() {
262 downcast_builder::<NullBuilder>(builder.as_mut(), to_type)?
263 .append_nulls(self.inner.len());
264 } else {
265 match to_type {
266 DataType::Boolean => {
267 let b = downcast_builder::<BooleanBuilder>(builder.as_mut(), to_type)?;
268 for i in 0..self.inner.len() {
269 b.append_option(self.try_get_value(i)?.as_bool());
270 }
271 }
272 DataType::Int64 => {
273 let b = downcast_builder::<Int64Builder>(builder.as_mut(), to_type)?;
274 for i in 0..self.inner.len() {
275 b.append_option(self.try_get_value(i)?.as_i64());
276 }
277 }
278 DataType::Float64 => {
279 let b = downcast_builder::<Float64Builder>(builder.as_mut(), to_type)?;
280 for i in 0..self.inner.len() {
281 b.append_option(self.try_get_value(i)?.as_f64());
282 }
283 }
284 DataType::Utf8View => {
285 let b = downcast_builder::<StringViewBuilder>(builder.as_mut(), to_type)?;
286 for i in 0..self.inner.len() {
287 let v = self.try_get_value(i)?;
288 if v.is_null() {
289 b.append_null();
290 } else if let Some(s) = v.as_str() {
291 b.append_value(s);
292 } else {
293 b.append_value(v.to_string());
294 }
295 }
296 }
297 _ => {
298 return CastTypeSnafu {
299 msg: format!("Cannot cast JSON value to {to_type}"),
300 }
301 .fail();
302 }
303 }
304 }
305 Ok(builder.finish())
306 }
307}
308
309fn try_align_list(
310 list_array: &ListArray,
311 expect_item: &FieldRef,
312 array_item: &FieldRef,
313) -> Result<ArrayRef> {
314 let item_aligned = match (expect_item.data_type(), array_item.data_type()) {
315 (DataType::Struct(_), DataType::Struct(_)) => {
316 JsonArray::from(list_array.values()).try_align(expect_item.data_type())?
317 }
318 (DataType::List(expect_item), DataType::List(array_item)) => {
319 let list_array = list_array.values().as_list::<i32>();
320 try_align_list(list_array, expect_item, array_item)?
321 }
322 _ => JsonArray::from(list_array.values()).try_cast(expect_item.data_type())?,
323 };
324 Ok(Arc::new(
325 GenericListArray::<i32>::try_new(
326 expect_item.clone(),
327 list_array.offsets().clone(),
328 item_aligned,
329 list_array.nulls().cloned(),
330 )
331 .context(ArrowComputeSnafu)?,
332 ))
333}
334
335impl<'a> From<&'a ArrayRef> for JsonArray<'a> {
336 fn from(inner: &'a ArrayRef) -> Self {
337 Self { inner }
338 }
339}
340
341#[cfg(test)]
342mod test {
343 use arrow_array::types::Int64Type;
344 use arrow_array::{
345 BinaryArray, BooleanArray, Float64Array, Int32Array, Int64Array, ListArray, StringArray,
346 };
347 use arrow_schema::{Field, Fields};
348 use serde_json::json;
349
350 use super::*;
351
352 #[test]
353 fn test_try_get_value() -> Result<()> {
354 let nulls = new_null_array(&DataType::Null, 2);
355 assert_eq!(JsonArray::from(&nulls).try_get_value(0)?, Value::Null);
356
357 let bools: ArrayRef = Arc::new(BooleanArray::from(vec![Some(true), None]));
358 assert_eq!(JsonArray::from(&bools).try_get_value(0)?, json!(true));
359 assert_eq!(JsonArray::from(&bools).try_get_value(1)?, Value::Null);
360
361 let ints: ArrayRef = Arc::new(Int64Array::from(vec![Some(-7), None]));
362 assert_eq!(JsonArray::from(&ints).try_get_value(0)?, json!(-7));
363 assert_eq!(JsonArray::from(&ints).try_get_value(1)?, Value::Null);
364
365 let floats: ArrayRef = Arc::new(Float64Array::from(vec![Some(1.5)]));
366 assert_eq!(JsonArray::from(&floats).try_get_value(0)?, json!(1.5));
367
368 let strings: ArrayRef = Arc::new(StringArray::from(vec![Some("hello"), None]));
369 assert_eq!(JsonArray::from(&strings).try_get_value(0)?, json!("hello"));
370 assert_eq!(JsonArray::from(&strings).try_get_value(1)?, Value::Null);
371
372 let binaries: ArrayRef = Arc::new(BinaryArray::from(vec![
373 br#"{"nested":[1,null,"x"]}"#.as_slice(),
374 b"null".as_slice(),
375 ]));
376 assert_eq!(
377 JsonArray::from(&binaries).try_get_value(0)?,
378 json!({"nested": [1, null, "x"]})
379 );
380 assert_eq!(JsonArray::from(&binaries).try_get_value(1)?, Value::Null);
381
382 let lists: ArrayRef = Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
383 Some(vec![Some(1), None, Some(3)]),
384 None,
385 ]));
386 assert_eq!(
387 JsonArray::from(&lists).try_get_value(0)?,
388 json!([1, null, 3])
389 );
390 assert_eq!(JsonArray::from(&lists).try_get_value(1)?, Value::Null);
391
392 let structs: ArrayRef = Arc::new(StructArray::from(vec![
393 (
394 Arc::new(Field::new("flag", DataType::Boolean, true)),
395 Arc::new(BooleanArray::from(vec![Some(true), None])) as ArrayRef,
396 ),
397 (
398 Arc::new(Field::new_list(
399 "items",
400 Field::new_list_field(DataType::Int64, true),
401 true,
402 )),
403 Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
404 Some(vec![Some(1), None]),
405 Some(vec![Some(2)]),
406 ])) as ArrayRef,
407 ),
408 ]));
409 assert_eq!(
410 JsonArray::from(&structs).try_get_value(0)?,
411 json!({"flag": true, "items": [1, null]})
412 );
413 assert_eq!(
414 JsonArray::from(&structs).try_get_value(1)?,
415 json!({"flag": null, "items": [2]})
416 );
417
418 let unsupported: ArrayRef = Arc::new(Int32Array::from(vec![1]));
419 assert_eq!(
420 JsonArray::from(&unsupported)
421 .try_get_value(0)
422 .unwrap_err()
423 .to_string(),
424 "Invalid JSON: unknown JSON type Int32"
425 );
426
427 Ok(())
428 }
429
430 #[test]
431 fn test_align_json_array() -> Result<()> {
432 struct TestCase {
433 json_array: ArrayRef,
434 schema_type: DataType,
435 expected: std::result::Result<ArrayRef, String>,
436 }
437
438 impl TestCase {
439 fn new(
440 json_array: StructArray,
441 schema_type: Fields,
442 expected: std::result::Result<Vec<ArrayRef>, String>,
443 ) -> Self {
444 Self {
445 json_array: Arc::new(json_array),
446 schema_type: DataType::Struct(schema_type.clone()),
447 expected: expected
448 .map(|x| Arc::new(StructArray::new(schema_type, x, None)) as ArrayRef),
449 }
450 }
451
452 fn test(self) -> Result<()> {
453 let result = JsonArray::from(&self.json_array).try_align(&self.schema_type);
454 match (result, self.expected) {
455 (Ok(json_array), Ok(expected)) => assert_eq!(&json_array, &expected),
456 (Ok(json_array), Err(e)) => {
457 panic!("expecting error {e} but actually get: {json_array:?}")
458 }
459 (Err(e), Err(expected)) => assert_eq!(e.to_string(), expected),
460 (Err(e), Ok(_)) => return Err(e),
461 }
462 Ok(())
463 }
464 }
465
466 TestCase::new(
468 StructArray::new_empty_fields(2, None),
469 Fields::from(vec![
470 Field::new("int", DataType::Int64, true),
471 Field::new_struct(
472 "nested",
473 vec![Field::new("bool", DataType::Boolean, true)],
474 true,
475 ),
476 Field::new("string", DataType::Utf8, true),
477 ]),
478 Ok(vec![
479 Arc::new(Int64Array::new_null(2)) as ArrayRef,
480 Arc::new(StructArray::new_null(
481 Fields::from(vec![Arc::new(Field::new("bool", DataType::Boolean, true))]),
482 2,
483 )),
484 Arc::new(StringArray::new_null(2)),
485 ]),
486 )
487 .test()?;
488
489 TestCase::new(
491 StructArray::from(vec![(
492 Arc::new(Field::new("float", DataType::Float64, true)),
493 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
494 )]),
495 Fields::from(vec![
496 Field::new("float", DataType::Float64, true),
497 Field::new("string", DataType::Utf8, true),
498 ]),
499 Ok(vec![
500 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
501 Arc::new(StringArray::new_null(3)),
502 ]),
503 )
504 .test()?;
505
506 TestCase::new(
508 StructArray::from(vec![
509 (
510 Arc::new(Field::new_list(
511 "list",
512 Field::new_list_field(DataType::Int64, true),
513 true,
514 )),
515 Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
516 Some(vec![Some(1)]),
517 None,
518 Some(vec![Some(2), Some(3)]),
519 ])) as ArrayRef,
520 ),
521 (
522 Arc::new(Field::new_struct(
523 "nested",
524 vec![Field::new("int", DataType::Int64, true)],
525 true,
526 )),
527 Arc::new(StructArray::from(vec![(
528 Arc::new(Field::new("int", DataType::Int64, true)),
529 Arc::new(Int64Array::from(vec![-1, -2, -3])) as ArrayRef,
530 )])),
531 ),
532 (
533 Arc::new(Field::new("string", DataType::Utf8, true)),
534 Arc::new(StringArray::from(vec!["a", "b", "c"])),
535 ),
536 ]),
537 Fields::from(vec![
538 Field::new("bool", DataType::Boolean, true),
539 Field::new_list("list", Field::new_list_field(DataType::Int64, true), true),
540 Field::new_struct(
541 "nested",
542 vec![
543 Field::new("float", DataType::Float64, true),
544 Field::new("int", DataType::Int64, true),
545 ],
546 true,
547 ),
548 Field::new("string", DataType::Utf8, true),
549 ]),
550 Ok(vec![
551 Arc::new(BooleanArray::new_null(3)) as ArrayRef,
552 Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
553 Some(vec![Some(1)]),
554 None,
555 Some(vec![Some(2), Some(3)]),
556 ])),
557 Arc::new(StructArray::from(vec![
558 (
559 Arc::new(Field::new("float", DataType::Float64, true)),
560 Arc::new(Float64Array::new_null(3)) as ArrayRef,
561 ),
562 (
563 Arc::new(Field::new("int", DataType::Int64, true)),
564 Arc::new(Int64Array::from(vec![-1, -2, -3])),
565 ),
566 ])),
567 Arc::new(StringArray::from(vec!["a", "b", "c"])),
568 ]),
569 )
570 .test()?;
571
572 Ok(())
573 }
574}