1use std::any::Any;
16use std::sync::Arc;
17
18use arrow::array::NullBufferBuilder;
19use arrow::compute::TakeOptions;
20use arrow::datatypes::DataType as ArrowDataType;
21use arrow_array::{Array, ArrayRef, StructArray};
22use datafusion_common::ScalarValue;
23use snafu::{ResultExt, ensure};
24
25use crate::error::{
26 ArrowComputeSnafu, ConversionSnafu, Error, InconsistentStructFieldsAndItemsSnafu, Result,
27 SerializeSnafu, UnsupportedOperationSnafu,
28};
29use crate::prelude::{ConcreteDataType, DataType, ScalarVector, ScalarVectorBuilder};
30use crate::serialize::Serializable;
31use crate::types::StructType;
32use crate::value::{StructValue, StructValueRef, Value, ValueRef};
33use crate::vectors::operations::VectorOp;
34use crate::vectors::{self, Helper, MutableVector, Validity, Vector, VectorRef};
35
36#[derive(Debug, PartialEq)]
38pub struct StructVector {
39 array: StructArray,
40 fields: StructType,
41}
42
43impl StructVector {
44 pub fn try_new(fields: StructType, array: StructArray) -> Result<Self> {
45 ensure!(
46 fields.fields().len() == array.fields().len(),
47 InconsistentStructFieldsAndItemsSnafu {
48 field_len: fields.fields().len(),
49 item_len: array.fields().len(),
50 }
51 );
52 Ok(StructVector { array, fields })
53 }
54
55 pub fn array(&self) -> &StructArray {
56 &self.array
57 }
58
59 pub fn as_arrow(&self) -> &dyn Array {
60 &self.array
61 }
62
63 pub fn struct_type(&self) -> &StructType {
64 &self.fields
65 }
66}
67
68impl Vector for StructVector {
69 fn data_type(&self) -> ConcreteDataType {
70 ConcreteDataType::struct_datatype(self.fields.clone())
71 }
72
73 fn vector_type_name(&self) -> String {
74 "StructVector".to_string()
75 }
76
77 fn as_any(&self) -> &dyn std::any::Any {
78 self
79 }
80
81 fn len(&self) -> usize {
82 self.array.len()
83 }
84
85 fn to_arrow_array(&self) -> ArrayRef {
86 Arc::new(self.array.clone())
87 }
88
89 fn to_boxed_arrow_array(&self) -> Box<dyn Array> {
90 Box::new(self.array.clone())
91 }
92
93 fn validity(&self) -> Validity {
94 vectors::impl_validity_for_vector!(self.array)
95 }
96
97 fn memory_size(&self) -> usize {
98 self.array.get_buffer_memory_size()
99 }
100
101 fn null_count(&self) -> usize {
102 self.array.null_count()
103 }
104
105 fn is_null(&self, row: usize) -> bool {
106 self.array.is_null(row)
107 }
108
109 fn slice(&self, offset: usize, length: usize) -> VectorRef {
110 Arc::new(StructVector {
111 array: self.array.slice(offset, length),
112 fields: self.fields.clone(),
113 })
114 }
115
116 fn get(&self, index: usize) -> Value {
117 if !self.array.is_valid(index) {
118 return Value::Null;
119 }
120
121 let values = (0..self.fields.fields().len())
122 .map(|i| {
123 let field_array = &self.array.column(i);
124
125 if field_array.is_null(i) {
126 Value::Null
127 } else {
128 let scalar_value = ScalarValue::try_from_array(field_array, index).unwrap();
129 Value::try_from(scalar_value).unwrap()
130 }
131 })
132 .collect();
133
134 Value::Struct(StructValue::try_new(values, self.fields.clone()).unwrap())
135 }
136
137 fn get_ref(&self, index: usize) -> ValueRef<'_> {
138 ValueRef::Struct(StructValueRef::Indexed {
139 vector: self,
140 idx: index,
141 })
142 }
143}
144
145impl VectorOp for StructVector {
146 fn replicate(&self, offsets: &[usize]) -> VectorRef {
147 let column_arrays = self
148 .array
149 .columns()
150 .iter()
151 .map(|col| {
152 let vector = Helper::try_into_vector(col)
153 .expect("Failed to replicate struct vector columns");
154 vector.replicate(offsets).to_arrow_array()
155 })
156 .collect::<Vec<_>>();
157 let replicated_array = StructArray::new(
158 self.array.fields().clone(),
159 column_arrays,
160 self.array.nulls().cloned(),
161 );
162 Arc::new(StructVector::try_new(self.fields.clone(), replicated_array).unwrap())
163 }
164
165 fn cast(&self, _to_type: &ConcreteDataType) -> Result<VectorRef> {
166 UnsupportedOperationSnafu {
167 op: "cast",
168 vector_type: self.vector_type_name(),
169 }
170 .fail()
171 }
172
173 fn filter(&self, filter: &vectors::BooleanVector) -> Result<VectorRef> {
174 let filtered =
175 datafusion_common::arrow::compute::filter(&self.array, filter.as_boolean_array())
176 .context(ArrowComputeSnafu)
177 .and_then(Helper::try_into_vector)?;
178 Ok(filtered)
179 }
180
181 fn take(&self, indices: &vectors::UInt32Vector) -> Result<VectorRef> {
182 let take_result = datafusion_common::arrow::compute::take(
183 &self.array,
184 indices.as_arrow(),
185 Some(TakeOptions { check_bounds: true }),
186 )
187 .context(ArrowComputeSnafu)
188 .and_then(Helper::try_into_vector)?;
189 Ok(take_result)
190 }
191}
192
193impl Serializable for StructVector {
194 fn serialize_to_json(&self) -> Result<Vec<serde_json::Value>> {
195 let vectors = self
196 .array
197 .columns()
198 .iter()
199 .map(|value_array| Helper::try_into_vector(value_array))
200 .collect::<Result<Vec<_>>>()?;
201
202 (0..self.array.len())
203 .map(|idx| {
204 let mut result = serde_json::Map::with_capacity(vectors.len());
205 for (field, vector) in self.fields.fields().iter().zip(vectors.iter()) {
206 let field_value = vector.get(idx);
207 result.insert(
208 field.name().to_string(),
209 field_value.try_into().context(SerializeSnafu)?,
210 );
211 }
212 Ok(result.into())
213 })
214 .collect::<Result<Vec<serde_json::Value>>>()
215 }
216}
217
218impl TryFrom<StructArray> for StructVector {
219 type Error = Error;
220
221 fn try_from(array: StructArray) -> Result<Self> {
222 let fields = match array.data_type() {
223 ArrowDataType::Struct(fields) => StructType::try_from(fields)?,
224 other => ConversionSnafu {
225 from: other.to_string(),
226 }
227 .fail()?,
228 };
229 Ok(Self { array, fields })
230 }
231}
232
233impl ScalarVector for StructVector {
234 type OwnedItem = StructValue;
235 type RefItem<'a> = StructValueRef<'a>;
236 type Iter<'a> = StructIter<'a>;
237 type Builder = StructVectorBuilder;
238
239 fn get_data(&self, idx: usize) -> Option<Self::RefItem<'_>> {
240 if self.array.is_valid(idx) {
241 Some(StructValueRef::Indexed { vector: self, idx })
242 } else {
243 None
244 }
245 }
246
247 fn iter_data(&self) -> Self::Iter<'_> {
248 StructIter::new(self)
249 }
250}
251
252pub struct StructIter<'a> {
253 vector: &'a StructVector,
254 index: usize,
255}
256
257impl<'a> StructIter<'a> {
258 pub fn new(vector: &'a StructVector) -> Self {
259 Self { vector, index: 0 }
260 }
261}
262
263impl<'a> Iterator for StructIter<'a> {
264 type Item = Option<StructValueRef<'a>>;
265
266 fn next(&mut self) -> Option<Self::Item> {
267 if self.index < self.vector.len() {
268 let idx = self.index;
269 self.index += 1;
270
271 if self.vector.is_null(idx) {
272 Some(None)
273 } else {
274 let value = StructValueRef::Indexed {
275 vector: self.vector,
276 idx,
277 };
278
279 Some(Some(value))
280 }
281 } else {
282 None
283 }
284 }
285
286 fn size_hint(&self) -> (usize, Option<usize>) {
287 (self.vector.len(), Some(self.vector.len()))
288 }
289}
290
291pub struct StructVectorBuilder {
292 value_builders: Vec<Box<dyn MutableVector>>,
293 null_buffer: NullBufferBuilder,
294 fields: StructType,
295}
296
297impl StructVectorBuilder {
298 pub fn with_type_and_capacity(fields: StructType, capacity: usize) -> Self {
299 let value_builders = fields
300 .fields()
301 .iter()
302 .map(|f| f.data_type().create_mutable_vector(capacity))
303 .collect();
304 Self {
305 value_builders,
306 null_buffer: NullBufferBuilder::new(capacity),
307 fields,
308 }
309 }
310
311 fn push_struct_value(&mut self, struct_value: &StructValue) -> Result<()> {
312 for (index, value) in struct_value.items().iter().enumerate() {
313 self.value_builders[index].try_push_value_ref(&value.as_value_ref())?;
314 }
315 self.null_buffer.append_non_null();
316
317 Ok(())
318 }
319
320 fn push_null_struct_value(&mut self) {
321 for builder in &mut self.value_builders {
322 builder.push_null();
323 }
324 self.null_buffer.append_null();
325 }
326}
327
328impl MutableVector for StructVectorBuilder {
329 fn data_type(&self) -> ConcreteDataType {
330 ConcreteDataType::struct_datatype(self.fields.clone())
331 }
332
333 fn len(&self) -> usize {
334 self.null_buffer.len()
335 }
336
337 fn as_any(&self) -> &dyn Any {
338 self
339 }
340
341 fn as_mut_any(&mut self) -> &mut dyn Any {
342 self
343 }
344
345 fn to_vector(&mut self) -> VectorRef {
346 Arc::new(self.finish())
347 }
348
349 fn to_vector_cloned(&self) -> VectorRef {
350 Arc::new(self.finish_cloned())
351 }
352
353 fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()> {
354 if let Some(struct_ref) = value.try_into_struct()? {
355 match struct_ref {
356 StructValueRef::Indexed { vector, idx } => match vector.get(idx).as_struct()? {
357 Some(struct_value) => self.push_struct_value(struct_value)?,
358 None => self.push_null(),
359 },
360 StructValueRef::Ref(val) => self.push_struct_value(val)?,
361 StructValueRef::RefList { val, fields } => {
362 let struct_value = StructValue::try_new(
363 val.iter().map(|v| Value::from(v.clone())).collect(),
364 fields.clone(),
365 )?;
366 self.push_struct_value(&struct_value)?;
367 }
368 }
369 } else {
370 self.push_null();
371 }
372
373 Ok(())
374 }
375
376 fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> {
377 for idx in offset..offset + length {
378 let value = vector.get_ref(idx);
379 self.try_push_value_ref(&value)?;
380 }
381
382 Ok(())
383 }
384
385 fn push_null(&mut self) {
386 self.push_null_struct_value();
387 }
388}
389
390impl ScalarVectorBuilder for StructVectorBuilder {
391 type VectorType = StructVector;
392
393 fn with_capacity(_capacity: usize) -> Self {
394 panic!("Must use StructVectorBuilder::with_type_capacity()");
395 }
396
397 fn push(&mut self, value: Option<<Self::VectorType as ScalarVector>::RefItem<'_>>) {
398 self.try_push_value_ref(&value.map(ValueRef::Struct).unwrap_or(ValueRef::Null))
399 .unwrap_or_else(|e| {
400 panic!(
401 "Failed to push value, expect value type {:?}, err:{}",
402 self.fields, e
403 );
404 });
405 }
406
407 fn finish(&mut self) -> Self::VectorType {
408 let arrays = self
409 .value_builders
410 .iter_mut()
411 .map(|b| b.to_vector().to_arrow_array())
412 .collect();
413 let struct_array = StructArray::new(
414 self.fields.as_arrow_fields(),
415 arrays,
416 self.null_buffer.finish(),
417 );
418
419 StructVector::try_new(self.fields.clone(), struct_array).unwrap()
420 }
421
422 fn finish_cloned(&self) -> Self::VectorType {
423 let arrays = self
424 .value_builders
425 .iter()
426 .map(|b| b.to_vector_cloned().to_arrow_array())
427 .collect();
428
429 let struct_array = StructArray::new(
430 self.fields.as_arrow_fields(),
431 arrays,
432 self.null_buffer.finish_cloned(),
433 );
434 StructVector::try_new(self.fields.clone(), struct_array).unwrap()
435 }
436}
437
438#[cfg(test)]
439mod tests {
440 use super::*;
441 use crate::types::StructField;
442 use crate::value::ListValue;
443 use crate::value::tests::*;
444
445 #[test]
446 fn test_struct_vector_builder() {
447 let struct_type = build_struct_type();
448
449 let struct_values = (0..10).map(|_| build_struct_value());
450 let mut builder = StructVectorBuilder::with_type_and_capacity(struct_type.clone(), 20);
451 for value in struct_values {
452 builder.push(Some(StructValueRef::Ref(&value)));
453 }
454
455 builder.push_nulls(5);
456
457 let vector = builder.finish();
458 assert_eq!(
459 vector.data_type(),
460 ConcreteDataType::struct_datatype(struct_type.clone())
461 );
462 assert_eq!(vector.len(), 15);
463 assert_eq!(vector.null_count(), 5);
464
465 let mut null_count = 0;
466 for item in vector.iter_data() {
467 if let Some(value) = item.as_ref() {
468 assert_eq!(value.struct_type(), &struct_type);
469 } else {
470 null_count += 1;
471 }
472 }
473 assert_eq!(5, null_count);
474
475 let value = vector.get(2);
476 if let Value::Struct(struct_value) = value {
477 assert_eq!(struct_value.struct_type(), &struct_type);
478 let mut items = struct_value.items().iter();
479 assert_eq!(items.next(), Some(&Value::Int32(1)));
480 assert_eq!(items.next(), Some(&Value::String("tom".into())));
481 assert_eq!(items.next(), Some(&Value::UInt8(25)));
482 assert_eq!(items.next(), Some(&Value::String("94038".into())));
483 assert_eq!(items.next(), Some(&Value::List(build_list_value())));
484 assert_eq!(items.next(), None);
485 } else {
486 panic!("Expected a struct value");
487 }
488 }
489
490 #[test]
491 fn test_deep_nested_struct_list() {
492 let struct_type = ConcreteDataType::struct_datatype(build_struct_type());
494 let struct_value = build_struct_value();
495 let struct_type_ref = Arc::new(struct_type);
497 let list_type = ConcreteDataType::list_datatype(struct_type_ref.clone());
498 let list_value = ListValue::new(
499 vec![
500 Value::Struct(struct_value.clone()),
501 Value::Struct(struct_value.clone()),
502 ],
503 struct_type_ref.clone(),
504 );
505 let root_type = StructType::new(Arc::new(vec![StructField::new(
507 "items".to_string(),
508 list_type,
509 false,
510 )]));
511 let root_value = StructValue::new(vec![Value::List(list_value)], root_type.clone());
512
513 let mut builder = StructVectorBuilder::with_type_and_capacity(root_type.clone(), 20);
514 builder.push(Some(StructValueRef::Ref(&root_value)));
515
516 let vector = builder.finish();
517 assert_eq!(vector.len(), 1);
518 assert_eq!(vector.null_count(), 0);
519 assert_eq!(
520 vector.data_type(),
521 ConcreteDataType::struct_datatype(root_type)
522 );
523 assert_eq!(vector.get(0), Value::Struct(root_value));
524 }
525}