1use std::any::Any;
16use std::sync::Arc;
17
18use arrow::array::{
19 Array, ArrayData, ArrayRef, BooleanBufferBuilder, Int32BufferBuilder, ListArray,
20};
21use arrow::buffer::{Buffer, NullBuffer};
22use arrow::datatypes::DataType as ArrowDataType;
23use serde_json::Value as JsonValue;
24
25use crate::data_type::{ConcreteDataType, DataType};
26use crate::error::Result;
27use crate::scalars::{ScalarVector, ScalarVectorBuilder};
28use crate::serialize::Serializable;
29use crate::types::ListType;
30use crate::value::{ListValue, ListValueRef, Value, ValueRef};
31use crate::vectors::{self, Helper, MutableVector, Validity, Vector, VectorRef};
32
33#[derive(Debug, PartialEq)]
35pub struct ListVector {
36 array: ListArray,
37 item_type: ConcreteDataType,
39}
40
41impl ListVector {
42 pub fn values_iter(&self) -> impl Iterator<Item = Result<Option<VectorRef>>> + '_ {
44 self.array
45 .iter()
46 .map(|value_opt| value_opt.map(Helper::try_into_vector).transpose())
47 }
48
49 pub(crate) fn as_arrow(&self) -> &dyn Array {
50 &self.array
51 }
52}
53
54impl Vector for ListVector {
55 fn data_type(&self) -> ConcreteDataType {
56 ConcreteDataType::List(ListType::new(self.item_type.clone()))
57 }
58
59 fn vector_type_name(&self) -> String {
60 "ListVector".to_string()
61 }
62
63 fn as_any(&self) -> &dyn Any {
64 self
65 }
66
67 fn len(&self) -> usize {
68 self.array.len()
69 }
70
71 fn to_arrow_array(&self) -> ArrayRef {
72 Arc::new(self.array.clone())
73 }
74
75 fn to_boxed_arrow_array(&self) -> Box<dyn Array> {
76 Box::new(self.array.clone())
77 }
78
79 fn validity(&self) -> Validity {
80 vectors::impl_validity_for_vector!(self.array)
81 }
82
83 fn memory_size(&self) -> usize {
84 self.array.get_buffer_memory_size()
85 }
86
87 fn null_count(&self) -> usize {
88 self.array.null_count()
89 }
90
91 fn is_null(&self, row: usize) -> bool {
92 self.array.is_null(row)
93 }
94
95 fn slice(&self, offset: usize, length: usize) -> VectorRef {
96 Arc::new(Self {
97 array: self.array.slice(offset, length),
98 item_type: self.item_type.clone(),
99 })
100 }
101
102 fn get(&self, index: usize) -> Value {
103 if !self.array.is_valid(index) {
104 return Value::Null;
105 }
106
107 let array = &self.array.value(index);
108 let vector = Helper::try_into_vector(array).unwrap_or_else(|_| {
109 panic!(
110 "arrow array with datatype {:?} cannot converted to our vector",
111 array.data_type()
112 )
113 });
114 let values = (0..vector.len())
115 .map(|i| vector.get(i))
116 .collect::<Vec<Value>>();
117 Value::List(ListValue::new(values, self.item_type.clone()))
118 }
119
120 fn get_ref(&self, index: usize) -> ValueRef {
121 ValueRef::List(ListValueRef::Indexed {
122 vector: self,
123 idx: index,
124 })
125 }
126}
127
128impl Serializable for ListVector {
129 fn serialize_to_json(&self) -> Result<Vec<JsonValue>> {
130 self.array
131 .iter()
132 .map(|v| match v {
133 None => Ok(JsonValue::Null),
134 Some(v) => Helper::try_into_vector(v)
135 .and_then(|v| v.serialize_to_json())
136 .map(JsonValue::Array),
137 })
138 .collect()
139 }
140}
141
142impl From<ListArray> for ListVector {
143 fn from(array: ListArray) -> Self {
144 let item_type = ConcreteDataType::from_arrow_type(match array.data_type() {
145 ArrowDataType::List(field) => field.data_type(),
146 other => panic!("Try to create ListVector from an arrow array with type {other:?}"),
147 });
148 Self { array, item_type }
149 }
150}
151
152vectors::impl_try_from_arrow_array_for_vector!(ListArray, ListVector);
153
154pub struct ListIter<'a> {
155 vector: &'a ListVector,
156 idx: usize,
157}
158
159impl<'a> ListIter<'a> {
160 fn new(vector: &'a ListVector) -> ListIter<'a> {
161 ListIter { vector, idx: 0 }
162 }
163}
164
165impl<'a> Iterator for ListIter<'a> {
166 type Item = Option<ListValueRef<'a>>;
167
168 #[inline]
169 fn next(&mut self) -> Option<Self::Item> {
170 if self.idx >= self.vector.len() {
171 return None;
172 }
173
174 let idx = self.idx;
175 self.idx += 1;
176
177 if self.vector.is_null(idx) {
178 return Some(None);
179 }
180
181 Some(Some(ListValueRef::Indexed {
182 vector: self.vector,
183 idx,
184 }))
185 }
186
187 #[inline]
188 fn size_hint(&self) -> (usize, Option<usize>) {
189 (self.vector.len(), Some(self.vector.len()))
190 }
191}
192
193impl ScalarVector for ListVector {
194 type OwnedItem = ListValue;
195 type RefItem<'a> = ListValueRef<'a>;
196 type Iter<'a> = ListIter<'a>;
197 type Builder = ListVectorBuilder;
198
199 fn get_data(&self, idx: usize) -> Option<Self::RefItem<'_>> {
200 if self.array.is_valid(idx) {
201 Some(ListValueRef::Indexed { vector: self, idx })
202 } else {
203 None
204 }
205 }
206
207 fn iter_data(&self) -> Self::Iter<'_> {
208 ListIter::new(self)
209 }
210}
211
212pub struct ListVectorBuilder {
216 item_type: ConcreteDataType,
217 offsets_builder: Int32BufferBuilder,
218 null_buffer_builder: NullBufferBuilder,
219 values_builder: Box<dyn MutableVector>,
220}
221
222impl ListVectorBuilder {
223 pub fn with_type_capacity(item_type: ConcreteDataType, capacity: usize) -> ListVectorBuilder {
226 let mut offsets_builder = Int32BufferBuilder::new(capacity + 1);
227 offsets_builder.append(0);
228 let values_builder = item_type.create_mutable_vector(capacity);
231
232 ListVectorBuilder {
233 item_type,
234 offsets_builder,
235 null_buffer_builder: NullBufferBuilder::new(capacity),
236 values_builder,
237 }
238 }
239
240 fn finish_list(&mut self, is_valid: bool) {
242 self.offsets_builder
243 .append(i32::try_from(self.values_builder.len()).unwrap());
244 self.null_buffer_builder.append(is_valid);
245 }
246
247 fn push_list_value(&mut self, list_value: &ListValue) -> Result<()> {
248 for v in list_value.items() {
249 self.values_builder.try_push_value_ref(v.as_value_ref())?;
250 }
251
252 self.finish_list(true);
253 Ok(())
254 }
255}
256
257impl MutableVector for ListVectorBuilder {
258 fn data_type(&self) -> ConcreteDataType {
259 ConcreteDataType::list_datatype(self.item_type.clone())
260 }
261
262 fn len(&self) -> usize {
263 self.null_buffer_builder.len()
264 }
265
266 fn as_any(&self) -> &dyn Any {
267 self
268 }
269
270 fn as_mut_any(&mut self) -> &mut dyn Any {
271 self
272 }
273
274 fn to_vector(&mut self) -> VectorRef {
275 Arc::new(self.finish())
276 }
277
278 fn to_vector_cloned(&self) -> VectorRef {
279 Arc::new(self.finish_cloned())
280 }
281
282 fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> {
283 if let Some(list_ref) = value.as_list()? {
284 match list_ref {
285 ListValueRef::Indexed { vector, idx } => match vector.get(idx).as_list()? {
286 Some(list_value) => self.push_list_value(list_value)?,
287 None => self.push_null(),
288 },
289 ListValueRef::Ref { val } => self.push_list_value(val)?,
290 }
291 } else {
292 self.push_null();
293 }
294
295 Ok(())
296 }
297
298 fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> {
299 for idx in offset..offset + length {
300 let value = vector.get_ref(idx);
301 self.try_push_value_ref(value)?;
302 }
303
304 Ok(())
305 }
306
307 fn push_null(&mut self) {
308 self.finish_list(false);
309 }
310}
311
312impl ScalarVectorBuilder for ListVectorBuilder {
313 type VectorType = ListVector;
314
315 fn with_capacity(_capacity: usize) -> Self {
316 panic!("Must use ListVectorBuilder::with_type_capacity()");
317 }
318
319 fn push(&mut self, value: Option<<Self::VectorType as ScalarVector>::RefItem<'_>>) {
320 self.try_push_value_ref(value.into()).unwrap_or_else(|e| {
324 panic!(
325 "Failed to push value, expect value type {:?}, err:{}",
326 self.item_type, e
327 );
328 });
329 }
330
331 fn finish(&mut self) -> Self::VectorType {
332 let len = self.len();
333 let values_vector = self.values_builder.to_vector();
334 let values_arr = values_vector.to_arrow_array();
335 let values_data = values_arr.to_data();
336
337 let offset_buffer = self.offsets_builder.finish();
338 let null_bit_buffer = self.null_buffer_builder.finish();
339 self.offsets_builder.append(0);
341 let data_type = ConcreteDataType::list_datatype(self.item_type.clone()).as_arrow_type();
342 let array_data_builder = ArrayData::builder(data_type)
343 .len(len)
344 .add_buffer(offset_buffer)
345 .add_child_data(values_data)
346 .null_bit_buffer(null_bit_buffer);
347
348 let array_data = unsafe { array_data_builder.build_unchecked() };
349 let array = ListArray::from(array_data);
350
351 ListVector {
352 array,
353 item_type: self.item_type.clone(),
354 }
355 }
356
357 fn finish_cloned(&self) -> Self::VectorType {
359 let len = self.len();
360 let values_vector = self.values_builder.to_vector_cloned();
361 let values_arr = values_vector.to_arrow_array();
362 let values_data = values_arr.to_data();
363
364 let offset_buffer = Buffer::from_slice_ref(self.offsets_builder.as_slice());
365 let nulls = self.null_buffer_builder.finish_cloned();
366
367 let data_type = ConcreteDataType::list_datatype(self.item_type.clone()).as_arrow_type();
368 let array_data_builder = ArrayData::builder(data_type)
369 .len(len)
370 .add_buffer(offset_buffer)
371 .add_child_data(values_data)
372 .nulls(nulls);
373
374 let array_data = unsafe { array_data_builder.build_unchecked() };
375 let array = ListArray::from(array_data);
376
377 ListVector {
378 array,
379 item_type: self.item_type.clone(),
380 }
381 }
382}
383
384#[derive(Debug)]
391struct NullBufferBuilder {
392 bitmap_builder: Option<BooleanBufferBuilder>,
393 len: usize,
395 capacity: usize,
396}
397
398impl NullBufferBuilder {
399 fn new(capacity: usize) -> Self {
402 Self {
403 bitmap_builder: None,
404 len: 0,
405 capacity,
406 }
407 }
408
409 fn len(&self) -> usize {
410 if let Some(b) = &self.bitmap_builder {
411 b.len()
412 } else {
413 self.len
414 }
415 }
416
417 #[inline]
420 fn append_non_null(&mut self) {
421 if let Some(buf) = self.bitmap_builder.as_mut() {
422 buf.append(true)
423 } else {
424 self.len += 1;
425 }
426 }
427
428 #[inline]
431 fn append_null(&mut self) {
432 self.materialize_if_needed();
433 self.bitmap_builder.as_mut().unwrap().append(false);
434 }
435
436 #[inline]
438 fn append(&mut self, not_null: bool) {
439 if not_null {
440 self.append_non_null()
441 } else {
442 self.append_null()
443 }
444 }
445
446 fn finish(&mut self) -> Option<Buffer> {
449 let buf = self.bitmap_builder.take().map(Into::into);
450 self.len = 0;
451 buf
452 }
453
454 fn finish_cloned(&self) -> Option<NullBuffer> {
456 let buffer = self.bitmap_builder.as_ref()?.finish_cloned();
457 Some(NullBuffer::new(buffer))
458 }
459
460 #[inline]
461 fn materialize_if_needed(&mut self) {
462 if self.bitmap_builder.is_none() {
463 self.materialize()
464 }
465 }
466
467 #[cold]
468 fn materialize(&mut self) {
469 if self.bitmap_builder.is_none() {
470 let mut b = BooleanBufferBuilder::new(self.len.max(self.capacity));
471 b.append_n(self.len, true);
472 self.bitmap_builder = Some(b);
473 }
474 }
475}
476
477#[cfg(test)]
478pub mod tests {
479 use arrow::array::{Int32Array, Int32Builder, ListBuilder};
480 use serde_json::json;
481
482 use super::*;
483 use crate::scalars::ScalarRef;
484 use crate::types::ListType;
485 use crate::vectors::Int32Vector;
486
487 pub fn new_list_vector(data: &[Option<Vec<Option<i32>>>]) -> ListVector {
488 let mut builder =
489 ListVectorBuilder::with_type_capacity(ConcreteDataType::int32_datatype(), 8);
490 for vec_opt in data {
491 if let Some(vec) = vec_opt {
492 let values = vec.iter().map(|v| Value::from(*v)).collect();
493 let list_value = ListValue::new(values, ConcreteDataType::int32_datatype());
494
495 builder.push(Some(ListValueRef::Ref { val: &list_value }));
496 } else {
497 builder.push(None);
498 }
499 }
500
501 builder.finish()
502 }
503
504 fn new_list_array(data: &[Option<Vec<Option<i32>>>]) -> ListArray {
505 let mut builder = ListBuilder::new(Int32Builder::new());
506 for vec_opt in data {
507 if let Some(vec) = vec_opt {
508 for value_opt in vec {
509 builder.values().append_option(*value_opt);
510 }
511
512 builder.append(true);
513 } else {
514 builder.append(false);
515 }
516 }
517
518 builder.finish()
519 }
520
521 #[test]
522 fn test_list_vector() {
523 let data = vec![
524 Some(vec![Some(1), Some(2), Some(3)]),
525 None,
526 Some(vec![Some(4), None, Some(6)]),
527 ];
528
529 let list_vector = new_list_vector(&data);
530
531 assert_eq!(
532 ConcreteDataType::List(ListType::new(ConcreteDataType::int32_datatype())),
533 list_vector.data_type()
534 );
535 assert_eq!("ListVector", list_vector.vector_type_name());
536 assert_eq!(3, list_vector.len());
537 assert!(!list_vector.is_null(0));
538 assert!(list_vector.is_null(1));
539 assert!(!list_vector.is_null(2));
540
541 let arrow_array = new_list_array(&data);
542 assert_eq!(
543 arrow_array,
544 *list_vector
545 .to_arrow_array()
546 .as_any()
547 .downcast_ref::<ListArray>()
548 .unwrap()
549 );
550 let validity = list_vector.validity();
551 assert!(!validity.is_all_null());
552 assert!(!validity.is_all_valid());
553 assert!(validity.is_set(0));
554 assert!(!validity.is_set(1));
555 assert!(validity.is_set(2));
556 assert_eq!(256, list_vector.memory_size());
557
558 let slice = list_vector.slice(0, 2).to_arrow_array();
559 let sliced_array = slice.as_any().downcast_ref::<ListArray>().unwrap();
560 assert_eq!(
561 Int32Array::from_iter_values([1, 2, 3]),
562 *sliced_array
563 .value(0)
564 .as_any()
565 .downcast_ref::<Int32Array>()
566 .unwrap()
567 );
568 assert!(sliced_array.is_null(1));
569
570 assert_eq!(
571 Value::List(ListValue::new(
572 vec![Value::Int32(1), Value::Int32(2), Value::Int32(3)],
573 ConcreteDataType::int32_datatype()
574 )),
575 list_vector.get(0)
576 );
577 let value_ref = list_vector.get_ref(0);
578 assert!(matches!(
579 value_ref,
580 ValueRef::List(ListValueRef::Indexed { .. })
581 ));
582 let value_ref = list_vector.get_ref(1);
583 if let ValueRef::List(ListValueRef::Indexed { idx, .. }) = value_ref {
584 assert_eq!(1, idx);
585 } else {
586 unreachable!()
587 }
588 assert_eq!(Value::Null, list_vector.get(1));
589 assert_eq!(
590 Value::List(ListValue::new(
591 vec![Value::Int32(4), Value::Null, Value::Int32(6)],
592 ConcreteDataType::int32_datatype()
593 )),
594 list_vector.get(2)
595 );
596 }
597
598 #[test]
599 fn test_from_arrow_array() {
600 let data = vec![
601 Some(vec![Some(1), Some(2), Some(3)]),
602 None,
603 Some(vec![Some(4), None, Some(6)]),
604 ];
605
606 let arrow_array = new_list_array(&data);
607 let array_ref: ArrayRef = Arc::new(arrow_array);
608 let expect = new_list_vector(&data);
609
610 let list_vector = ListVector::try_from_arrow_array(array_ref).unwrap();
612 assert_eq!(expect, list_vector);
613
614 let arrow_array = new_list_array(&data);
616 let list_vector = ListVector::from(arrow_array);
617 assert_eq!(expect, list_vector);
618 }
619
620 #[test]
621 fn test_iter_list_vector_values() {
622 let data = vec![
623 Some(vec![Some(1), Some(2), Some(3)]),
624 None,
625 Some(vec![Some(4), None, Some(6)]),
626 ];
627
628 let list_vector = new_list_vector(&data);
629
630 assert_eq!(
631 ConcreteDataType::List(ListType::new(ConcreteDataType::int32_datatype())),
632 list_vector.data_type()
633 );
634 let mut iter = list_vector.values_iter();
635 assert_eq!(
636 Arc::new(Int32Vector::from_slice([1, 2, 3])) as VectorRef,
637 *iter.next().unwrap().unwrap().unwrap()
638 );
639 assert!(iter.next().unwrap().unwrap().is_none());
640 assert_eq!(
641 Arc::new(Int32Vector::from(vec![Some(4), None, Some(6)])) as VectorRef,
642 *iter.next().unwrap().unwrap().unwrap(),
643 );
644 assert!(iter.next().is_none())
645 }
646
647 #[test]
648 fn test_serialize_to_json() {
649 let data = vec![
650 Some(vec![Some(1), Some(2), Some(3)]),
651 None,
652 Some(vec![Some(4), None, Some(6)]),
653 ];
654
655 let list_vector = new_list_vector(&data);
656 assert_eq!(
657 vec![json!([1, 2, 3]), json!(null), json!([4, null, 6]),],
658 list_vector.serialize_to_json().unwrap()
659 );
660 }
661
662 #[test]
663 fn test_list_vector_builder() {
664 let mut builder =
665 ListType::new(ConcreteDataType::int32_datatype()).create_mutable_vector(3);
666 builder.push_value_ref(ValueRef::List(ListValueRef::Ref {
667 val: &ListValue::new(
668 vec![Value::Int32(4), Value::Null, Value::Int32(6)],
669 ConcreteDataType::int32_datatype(),
670 ),
671 }));
672 assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err());
673
674 let data = vec![
675 Some(vec![Some(1), Some(2), Some(3)]),
676 None,
677 Some(vec![Some(7), Some(8), None]),
678 ];
679 let input = new_list_vector(&data);
680 builder.extend_slice_of(&input, 1, 2).unwrap();
681 assert!(builder
682 .extend_slice_of(&crate::vectors::Int32Vector::from_slice([13]), 0, 1)
683 .is_err());
684 let vector = builder.to_vector();
685
686 let expect: VectorRef = Arc::new(new_list_vector(&[
687 Some(vec![Some(4), None, Some(6)]),
688 None,
689 Some(vec![Some(7), Some(8), None]),
690 ]));
691 assert_eq!(expect, vector);
692 }
693
694 #[test]
695 fn test_list_vector_for_scalar() {
696 let mut builder =
697 ListVectorBuilder::with_type_capacity(ConcreteDataType::int32_datatype(), 2);
698 builder.push(None);
699 builder.push(Some(ListValueRef::Ref {
700 val: &ListValue::new(
701 vec![Value::Int32(4), Value::Null, Value::Int32(6)],
702 ConcreteDataType::int32_datatype(),
703 ),
704 }));
705 let vector = builder.finish();
706
707 let expect = new_list_vector(&[None, Some(vec![Some(4), None, Some(6)])]);
708 assert_eq!(expect, vector);
709
710 assert!(vector.get_data(0).is_none());
711 assert_eq!(
712 ListValueRef::Indexed {
713 vector: &vector,
714 idx: 1
715 },
716 vector.get_data(1).unwrap()
717 );
718 assert_eq!(
719 *vector.get(1).as_list().unwrap().unwrap(),
720 vector.get_data(1).unwrap().to_owned_scalar()
721 );
722
723 let mut iter = vector.iter_data();
724 assert!(iter.next().unwrap().is_none());
725 assert_eq!(
726 ListValueRef::Indexed {
727 vector: &vector,
728 idx: 1
729 },
730 iter.next().unwrap().unwrap()
731 );
732 assert!(iter.next().is_none());
733
734 let mut iter = vector.iter_data();
735 assert_eq!(2, iter.size_hint().0);
736 assert_eq!(
737 ListValueRef::Indexed {
738 vector: &vector,
739 idx: 1
740 },
741 iter.nth(1).unwrap().unwrap()
742 );
743 }
744
745 #[test]
746 fn test_list_vector_builder_finish_cloned() {
747 let mut builder =
748 ListVectorBuilder::with_type_capacity(ConcreteDataType::int32_datatype(), 2);
749 builder.push(None);
750 builder.push(Some(ListValueRef::Ref {
751 val: &ListValue::new(
752 vec![Value::Int32(4), Value::Null, Value::Int32(6)],
753 ConcreteDataType::int32_datatype(),
754 ),
755 }));
756 let vector = builder.finish_cloned();
757 assert_eq!(vector.len(), 2);
758 assert_eq!(builder.len(), 2);
759 }
760}