1use std::any::Any;
16use std::sync::Arc;
17
18use arrow::array::{
19 Array, ArrayData, ArrayRef, BooleanBufferBuilder, Int32BufferBuilder, ListArray,
20};
21use arrow::buffer::{Buffer, NullBuffer};
22use arrow::datatypes::DataType as ArrowDataType;
23use serde_json::Value as JsonValue;
24
25use crate::data_type::{ConcreteDataType, DataType};
26use crate::error::Result;
27use crate::scalars::{ScalarVector, ScalarVectorBuilder};
28use crate::serialize::Serializable;
29use crate::types::ListType;
30use crate::value::{ListValue, ListValueRef, Value, ValueRef};
31use crate::vectors::{self, Helper, MutableVector, Validity, Vector, VectorRef};
32
33#[derive(Debug, PartialEq)]
35pub struct ListVector {
36 array: ListArray,
37 item_type: Arc<ConcreteDataType>,
39}
40
41impl ListVector {
42 pub fn values_iter(&self) -> impl Iterator<Item = Result<Option<VectorRef>>> + '_ {
44 self.array
45 .iter()
46 .map(|value_opt| value_opt.map(Helper::try_into_vector).transpose())
47 }
48
49 pub(crate) fn as_arrow(&self) -> &dyn Array {
50 &self.array
51 }
52
53 pub(crate) fn item_type(&self) -> Arc<ConcreteDataType> {
54 self.item_type.clone()
55 }
56}
57
58impl Vector for ListVector {
59 fn data_type(&self) -> ConcreteDataType {
60 ConcreteDataType::List(ListType::new(self.item_type()))
61 }
62
63 fn vector_type_name(&self) -> String {
64 "ListVector".to_string()
65 }
66
67 fn as_any(&self) -> &dyn Any {
68 self
69 }
70
71 fn len(&self) -> usize {
72 self.array.len()
73 }
74
75 fn to_arrow_array(&self) -> ArrayRef {
76 Arc::new(self.array.clone())
77 }
78
79 fn to_boxed_arrow_array(&self) -> Box<dyn Array> {
80 Box::new(self.array.clone())
81 }
82
83 fn validity(&self) -> Validity {
84 vectors::impl_validity_for_vector!(self.array)
85 }
86
87 fn memory_size(&self) -> usize {
88 self.array.get_buffer_memory_size()
89 }
90
91 fn null_count(&self) -> usize {
92 self.array.null_count()
93 }
94
95 fn is_null(&self, row: usize) -> bool {
96 self.array.is_null(row)
97 }
98
99 fn slice(&self, offset: usize, length: usize) -> VectorRef {
100 Arc::new(Self {
101 array: self.array.slice(offset, length),
102 item_type: self.item_type.clone(),
103 })
104 }
105
106 fn get(&self, index: usize) -> Value {
107 if !self.array.is_valid(index) {
108 return Value::Null;
109 }
110
111 let array = &self.array.value(index);
112 let vector = Helper::try_into_vector(array).unwrap_or_else(|_| {
113 panic!(
114 "arrow array with datatype {:?} cannot converted to our vector",
115 array.data_type()
116 )
117 });
118 let values = (0..vector.len())
119 .map(|i| vector.get(i))
120 .collect::<Vec<Value>>();
121 Value::List(ListValue::new(values, self.item_type.clone()))
122 }
123
124 fn get_ref(&self, index: usize) -> ValueRef<'_> {
125 ValueRef::List(ListValueRef::Indexed {
126 vector: self,
127 idx: index,
128 })
129 }
130}
131
132impl Serializable for ListVector {
133 fn serialize_to_json(&self) -> Result<Vec<JsonValue>> {
134 self.array
135 .iter()
136 .map(|v| match v {
137 None => Ok(JsonValue::Null),
138 Some(v) => Helper::try_into_vector(v)
139 .and_then(|v| v.serialize_to_json())
140 .map(JsonValue::Array),
141 })
142 .collect()
143 }
144}
145
146impl From<ListArray> for ListVector {
147 fn from(array: ListArray) -> Self {
148 let item_type = Arc::new(ConcreteDataType::from_arrow_type(match array.data_type() {
149 ArrowDataType::List(field) => field.data_type(),
150 other => panic!("Try to create ListVector from an arrow array with type {other:?}"),
151 }));
152 Self { array, item_type }
153 }
154}
155
156vectors::impl_try_from_arrow_array_for_vector!(ListArray, ListVector);
157
158pub struct ListIter<'a> {
159 vector: &'a ListVector,
160 idx: usize,
161}
162
163impl<'a> ListIter<'a> {
164 fn new(vector: &'a ListVector) -> ListIter<'a> {
165 ListIter { vector, idx: 0 }
166 }
167}
168
169impl<'a> Iterator for ListIter<'a> {
170 type Item = Option<ListValueRef<'a>>;
171
172 #[inline]
173 fn next(&mut self) -> Option<Self::Item> {
174 if self.idx >= self.vector.len() {
175 return None;
176 }
177
178 let idx = self.idx;
179 self.idx += 1;
180
181 if self.vector.is_null(idx) {
182 return Some(None);
183 }
184
185 Some(Some(ListValueRef::Indexed {
186 vector: self.vector,
187 idx,
188 }))
189 }
190
191 #[inline]
192 fn size_hint(&self) -> (usize, Option<usize>) {
193 (self.vector.len(), Some(self.vector.len()))
194 }
195}
196
197impl ScalarVector for ListVector {
198 type OwnedItem = ListValue;
199 type RefItem<'a> = ListValueRef<'a>;
200 type Iter<'a> = ListIter<'a>;
201 type Builder = ListVectorBuilder;
202
203 fn get_data(&self, idx: usize) -> Option<Self::RefItem<'_>> {
204 if self.array.is_valid(idx) {
205 Some(ListValueRef::Indexed { vector: self, idx })
206 } else {
207 None
208 }
209 }
210
211 fn iter_data(&self) -> Self::Iter<'_> {
212 ListIter::new(self)
213 }
214}
215
216pub struct ListVectorBuilder {
220 item_type: Arc<ConcreteDataType>,
221 offsets_builder: Int32BufferBuilder,
222 null_buffer_builder: NullBufferBuilder,
223 values_builder: Box<dyn MutableVector>,
224}
225
226impl ListVectorBuilder {
227 pub fn with_type_capacity(
230 item_type: Arc<ConcreteDataType>,
231 capacity: usize,
232 ) -> ListVectorBuilder {
233 let mut offsets_builder = Int32BufferBuilder::new(capacity + 1);
234 offsets_builder.append(0);
235 let values_builder = item_type.create_mutable_vector(capacity);
238
239 ListVectorBuilder {
240 item_type,
241 offsets_builder,
242 null_buffer_builder: NullBufferBuilder::new(capacity),
243 values_builder,
244 }
245 }
246
247 fn finish_list(&mut self, is_valid: bool) {
249 self.offsets_builder
250 .append(i32::try_from(self.values_builder.len()).unwrap());
251 self.null_buffer_builder.append(is_valid);
252 }
253
254 fn push_list_value(&mut self, list_value: &ListValue) -> Result<()> {
255 for v in list_value.items() {
256 self.values_builder.try_push_value_ref(&v.as_value_ref())?;
257 }
258
259 self.finish_list(true);
260 Ok(())
261 }
262}
263
264impl MutableVector for ListVectorBuilder {
265 fn data_type(&self) -> ConcreteDataType {
266 ConcreteDataType::list_datatype(self.item_type.clone())
267 }
268
269 fn len(&self) -> usize {
270 self.null_buffer_builder.len()
271 }
272
273 fn as_any(&self) -> &dyn Any {
274 self
275 }
276
277 fn as_mut_any(&mut self) -> &mut dyn Any {
278 self
279 }
280
281 fn to_vector(&mut self) -> VectorRef {
282 Arc::new(self.finish())
283 }
284
285 fn to_vector_cloned(&self) -> VectorRef {
286 Arc::new(self.finish_cloned())
287 }
288
289 fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()> {
290 if let Some(list_ref) = value.try_into_list()? {
291 match list_ref {
292 ListValueRef::Indexed { vector, idx } => match vector.get(idx).as_list()? {
293 Some(list_value) => self.push_list_value(list_value)?,
294 None => self.push_null(),
295 },
296 ListValueRef::Ref { val } => self.push_list_value(val)?,
297 ListValueRef::RefList { val, item_datatype } => {
298 let list_value = ListValue::new(
299 val.iter().map(|v| Value::from(v.clone())).collect(),
300 item_datatype.clone(),
301 );
302 self.push_list_value(&list_value)?;
303 }
304 }
305 } else {
306 self.push_null();
307 }
308
309 Ok(())
310 }
311
312 fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> {
313 for idx in offset..offset + length {
314 let value = vector.get_ref(idx);
315 self.try_push_value_ref(&value)?;
316 }
317
318 Ok(())
319 }
320
321 fn push_null(&mut self) {
322 self.finish_list(false);
323 }
324}
325
326impl ScalarVectorBuilder for ListVectorBuilder {
327 type VectorType = ListVector;
328
329 fn with_capacity(_capacity: usize) -> Self {
330 panic!("Must use ListVectorBuilder::with_type_capacity()");
331 }
332
333 fn push(&mut self, value: Option<<Self::VectorType as ScalarVector>::RefItem<'_>>) {
334 self.try_push_value_ref(&value.into()).unwrap_or_else(|e| {
338 panic!(
339 "Failed to push value, expect value type {:?}, err:{}",
340 self.item_type, e
341 );
342 });
343 }
344
345 fn finish(&mut self) -> Self::VectorType {
346 let len = self.len();
347 let values_vector = self.values_builder.to_vector();
348 let values_arr = values_vector.to_arrow_array();
349 let values_data = values_arr.to_data();
350
351 let offset_buffer = self.offsets_builder.finish();
352 let null_bit_buffer = self.null_buffer_builder.finish();
353 self.offsets_builder.append(0);
355 let data_type = ConcreteDataType::list_datatype(self.item_type.clone()).as_arrow_type();
356 let array_data_builder = ArrayData::builder(data_type)
357 .len(len)
358 .add_buffer(offset_buffer)
359 .add_child_data(values_data)
360 .null_bit_buffer(null_bit_buffer);
361
362 let array_data = unsafe { array_data_builder.build_unchecked() };
363 let array = ListArray::from(array_data);
364
365 ListVector {
366 array,
367 item_type: self.item_type.clone(),
368 }
369 }
370
371 fn finish_cloned(&self) -> Self::VectorType {
373 let len = self.len();
374 let values_vector = self.values_builder.to_vector_cloned();
375 let values_arr = values_vector.to_arrow_array();
376 let values_data = values_arr.to_data();
377
378 let offset_buffer = Buffer::from_slice_ref(self.offsets_builder.as_slice());
379 let nulls = self.null_buffer_builder.finish_cloned();
380
381 let data_type = ConcreteDataType::list_datatype(self.item_type.clone()).as_arrow_type();
382 let array_data_builder = ArrayData::builder(data_type)
383 .len(len)
384 .add_buffer(offset_buffer)
385 .add_child_data(values_data)
386 .nulls(nulls);
387
388 let array_data = unsafe { array_data_builder.build_unchecked() };
389 let array = ListArray::from(array_data);
390
391 ListVector {
392 array,
393 item_type: self.item_type.clone(),
394 }
395 }
396}
397
398#[derive(Debug)]
405struct NullBufferBuilder {
406 bitmap_builder: Option<BooleanBufferBuilder>,
407 len: usize,
409 capacity: usize,
410}
411
412impl NullBufferBuilder {
413 fn new(capacity: usize) -> Self {
416 Self {
417 bitmap_builder: None,
418 len: 0,
419 capacity,
420 }
421 }
422
423 fn len(&self) -> usize {
424 if let Some(b) = &self.bitmap_builder {
425 b.len()
426 } else {
427 self.len
428 }
429 }
430
431 #[inline]
434 fn append_non_null(&mut self) {
435 if let Some(buf) = self.bitmap_builder.as_mut() {
436 buf.append(true)
437 } else {
438 self.len += 1;
439 }
440 }
441
442 #[inline]
445 fn append_null(&mut self) {
446 self.materialize_if_needed();
447 self.bitmap_builder.as_mut().unwrap().append(false);
448 }
449
450 #[inline]
452 fn append(&mut self, not_null: bool) {
453 if not_null {
454 self.append_non_null()
455 } else {
456 self.append_null()
457 }
458 }
459
460 fn finish(&mut self) -> Option<Buffer> {
463 let buf = self.bitmap_builder.take().map(Into::into);
464 self.len = 0;
465 buf
466 }
467
468 fn finish_cloned(&self) -> Option<NullBuffer> {
470 let buffer = self.bitmap_builder.as_ref()?.finish_cloned();
471 Some(NullBuffer::new(buffer))
472 }
473
474 #[inline]
475 fn materialize_if_needed(&mut self) {
476 if self.bitmap_builder.is_none() {
477 self.materialize()
478 }
479 }
480
481 #[cold]
482 fn materialize(&mut self) {
483 if self.bitmap_builder.is_none() {
484 let mut b = BooleanBufferBuilder::new(self.len.max(self.capacity));
485 b.append_n(self.len, true);
486 self.bitmap_builder = Some(b);
487 }
488 }
489}
490
491#[cfg(test)]
492pub mod tests {
493 use arrow::array::{Int32Array, Int32Builder, ListBuilder};
494 use serde_json::json;
495
496 use super::*;
497 use crate::scalars::ScalarRef;
498 use crate::types::ListType;
499 use crate::vectors::Int32Vector;
500
501 pub fn new_list_vector(data: &[Option<Vec<Option<i32>>>]) -> ListVector {
502 let item_type = Arc::new(ConcreteDataType::int32_datatype());
503 let mut builder = ListVectorBuilder::with_type_capacity(item_type.clone(), 8);
504 for vec_opt in data {
505 if let Some(vec) = vec_opt {
506 let values = vec.iter().map(|v| Value::from(*v)).collect();
507 let list_value = ListValue::new(values, item_type.clone());
508
509 builder.push(Some(ListValueRef::Ref { val: &list_value }));
510 } else {
511 builder.push(None);
512 }
513 }
514
515 builder.finish()
516 }
517
518 fn new_list_array(data: &[Option<Vec<Option<i32>>>]) -> ListArray {
519 let mut builder = ListBuilder::new(Int32Builder::new());
520 for vec_opt in data {
521 if let Some(vec) = vec_opt {
522 for value_opt in vec {
523 builder.values().append_option(*value_opt);
524 }
525
526 builder.append(true);
527 } else {
528 builder.append(false);
529 }
530 }
531
532 builder.finish()
533 }
534
535 #[test]
536 fn test_list_vector() {
537 let data = vec![
538 Some(vec![Some(1), Some(2), Some(3)]),
539 None,
540 Some(vec![Some(4), None, Some(6)]),
541 ];
542
543 let item_type = Arc::new(ConcreteDataType::int32_datatype());
544 let list_vector = new_list_vector(&data);
545
546 assert_eq!(
547 ConcreteDataType::List(ListType::new(item_type.clone())),
548 list_vector.data_type()
549 );
550 assert_eq!("ListVector", list_vector.vector_type_name());
551 assert_eq!(3, list_vector.len());
552 assert!(!list_vector.is_null(0));
553 assert!(list_vector.is_null(1));
554 assert!(!list_vector.is_null(2));
555
556 let arrow_array = new_list_array(&data);
557 assert_eq!(
558 arrow_array,
559 *list_vector
560 .to_arrow_array()
561 .as_any()
562 .downcast_ref::<ListArray>()
563 .unwrap()
564 );
565 let validity = list_vector.validity();
566 assert!(!validity.is_all_null());
567 assert!(!validity.is_all_valid());
568 assert!(validity.is_set(0));
569 assert!(!validity.is_set(1));
570 assert!(validity.is_set(2));
571 assert_eq!(224, list_vector.memory_size());
572
573 let slice = list_vector.slice(0, 2).to_arrow_array();
574 let sliced_array = slice.as_any().downcast_ref::<ListArray>().unwrap();
575 assert_eq!(
576 Int32Array::from_iter_values([1, 2, 3]),
577 *sliced_array
578 .value(0)
579 .as_any()
580 .downcast_ref::<Int32Array>()
581 .unwrap()
582 );
583 assert!(sliced_array.is_null(1));
584
585 assert_eq!(
586 Value::List(ListValue::new(
587 vec![Value::Int32(1), Value::Int32(2), Value::Int32(3)],
588 item_type.clone()
589 )),
590 list_vector.get(0)
591 );
592 let value_ref = list_vector.get_ref(0);
593 assert!(matches!(
594 value_ref,
595 ValueRef::List(ListValueRef::Indexed { .. })
596 ));
597 let value_ref = list_vector.get_ref(1);
598 if let ValueRef::List(ListValueRef::Indexed { idx, .. }) = value_ref {
599 assert_eq!(1, idx);
600 } else {
601 unreachable!()
602 }
603 assert_eq!(Value::Null, list_vector.get(1));
604 assert_eq!(
605 Value::List(ListValue::new(
606 vec![Value::Int32(4), Value::Null, Value::Int32(6)],
607 item_type.clone()
608 )),
609 list_vector.get(2)
610 );
611 }
612
613 #[test]
614 fn test_from_arrow_array() {
615 let data = vec![
616 Some(vec![Some(1), Some(2), Some(3)]),
617 None,
618 Some(vec![Some(4), None, Some(6)]),
619 ];
620
621 let arrow_array = new_list_array(&data);
622 let array_ref: ArrayRef = Arc::new(arrow_array);
623 let expect = new_list_vector(&data);
624
625 let list_vector = ListVector::try_from_arrow_array(array_ref).unwrap();
627 assert_eq!(expect, list_vector);
628
629 let arrow_array = new_list_array(&data);
631 let list_vector = ListVector::from(arrow_array);
632 assert_eq!(expect, list_vector);
633 }
634
635 #[test]
636 fn test_iter_list_vector_values() {
637 let data = vec![
638 Some(vec![Some(1), Some(2), Some(3)]),
639 None,
640 Some(vec![Some(4), None, Some(6)]),
641 ];
642
643 let item_type = Arc::new(ConcreteDataType::int32_datatype());
644 let list_vector = new_list_vector(&data);
645
646 assert_eq!(
647 ConcreteDataType::List(ListType::new(item_type.clone())),
648 list_vector.data_type()
649 );
650 let mut iter = list_vector.values_iter();
651 assert_eq!(
652 Arc::new(Int32Vector::from_slice([1, 2, 3])) as VectorRef,
653 *iter.next().unwrap().unwrap().unwrap()
654 );
655 assert!(iter.next().unwrap().unwrap().is_none());
656 assert_eq!(
657 Arc::new(Int32Vector::from(vec![Some(4), None, Some(6)])) as VectorRef,
658 *iter.next().unwrap().unwrap().unwrap(),
659 );
660 assert!(iter.next().is_none())
661 }
662
663 #[test]
664 fn test_serialize_to_json() {
665 let data = vec![
666 Some(vec![Some(1), Some(2), Some(3)]),
667 None,
668 Some(vec![Some(4), None, Some(6)]),
669 ];
670
671 let list_vector = new_list_vector(&data);
672 assert_eq!(
673 vec![json!([1, 2, 3]), json!(null), json!([4, null, 6]),],
674 list_vector.serialize_to_json().unwrap()
675 );
676 }
677
678 #[test]
679 fn test_list_vector_builder() {
680 let item_type = Arc::new(ConcreteDataType::int32_datatype());
681 let mut builder = ListType::new(item_type.clone()).create_mutable_vector(3);
682 builder.push_value_ref(&ValueRef::List(ListValueRef::Ref {
683 val: &ListValue::new(
684 vec![Value::Int32(4), Value::Null, Value::Int32(6)],
685 item_type.clone(),
686 ),
687 }));
688 assert!(builder.try_push_value_ref(&ValueRef::Int32(123)).is_err());
689
690 let data = vec![
691 Some(vec![Some(1), Some(2), Some(3)]),
692 None,
693 Some(vec![Some(7), Some(8), None]),
694 ];
695 let input = new_list_vector(&data);
696 builder.extend_slice_of(&input, 1, 2).unwrap();
697 assert!(
698 builder
699 .extend_slice_of(&crate::vectors::Int32Vector::from_slice([13]), 0, 1)
700 .is_err()
701 );
702 let vector = builder.to_vector();
703
704 let expect: VectorRef = Arc::new(new_list_vector(&[
705 Some(vec![Some(4), None, Some(6)]),
706 None,
707 Some(vec![Some(7), Some(8), None]),
708 ]));
709 assert_eq!(expect, vector);
710 }
711
712 #[test]
713 fn test_list_vector_for_scalar() {
714 let item_type = Arc::new(ConcreteDataType::int32_datatype());
715 let mut builder = ListVectorBuilder::with_type_capacity(item_type.clone(), 2);
716 builder.push(None);
717 builder.push(Some(ListValueRef::Ref {
718 val: &ListValue::new(
719 vec![Value::Int32(4), Value::Null, Value::Int32(6)],
720 item_type.clone(),
721 ),
722 }));
723 let vector = builder.finish();
724
725 let expect = new_list_vector(&[None, Some(vec![Some(4), None, Some(6)])]);
726 assert_eq!(expect, vector);
727
728 assert!(vector.get_data(0).is_none());
729 assert_eq!(
730 ListValueRef::Indexed {
731 vector: &vector,
732 idx: 1
733 },
734 vector.get_data(1).unwrap()
735 );
736 assert_eq!(
737 *vector.get(1).as_list().unwrap().unwrap(),
738 vector.get_data(1).unwrap().to_owned_scalar()
739 );
740
741 let mut iter = vector.iter_data();
742 assert!(iter.next().unwrap().is_none());
743 assert_eq!(
744 ListValueRef::Indexed {
745 vector: &vector,
746 idx: 1
747 },
748 iter.next().unwrap().unwrap()
749 );
750 assert!(iter.next().is_none());
751
752 let mut iter = vector.iter_data();
753 assert_eq!(2, iter.size_hint().0);
754 assert_eq!(
755 ListValueRef::Indexed {
756 vector: &vector,
757 idx: 1
758 },
759 iter.nth(1).unwrap().unwrap()
760 );
761 }
762
763 #[test]
764 fn test_list_vector_builder_finish_cloned() {
765 let item_type = Arc::new(ConcreteDataType::int32_datatype());
766 let mut builder = ListVectorBuilder::with_type_capacity(item_type.clone(), 2);
767 builder.push(None);
768 builder.push(Some(ListValueRef::Ref {
769 val: &ListValue::new(
770 vec![Value::Int32(4), Value::Null, Value::Int32(6)],
771 item_type.clone(),
772 ),
773 }));
774 let vector = builder.finish_cloned();
775 assert_eq!(vector.len(), 2);
776 assert_eq!(builder.len(), 2);
777 }
778}