datatypes/vectors/
struct_vector.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::Arc;
17
18use arrow::array::NullBufferBuilder;
19use arrow::compute::TakeOptions;
20use arrow::datatypes::DataType as ArrowDataType;
21use arrow_array::{Array, ArrayRef, StructArray};
22use datafusion_common::ScalarValue;
23use snafu::{ResultExt, ensure};
24
25use crate::error::{
26    ArrowComputeSnafu, ConversionSnafu, Error, InconsistentStructFieldsAndItemsSnafu, Result,
27    SerializeSnafu, UnsupportedOperationSnafu,
28};
29use crate::prelude::{ConcreteDataType, DataType, ScalarVector, ScalarVectorBuilder};
30use crate::serialize::Serializable;
31use crate::types::StructType;
32use crate::value::{StructValue, StructValueRef, Value, ValueRef};
33use crate::vectors::operations::VectorOp;
34use crate::vectors::{self, Helper, MutableVector, Validity, Vector, VectorRef};
35
36/// A simple wrapper around `StructArray` to represent a vector of structs in GreptimeDB.
37#[derive(Debug, PartialEq)]
38pub struct StructVector {
39    array: StructArray,
40    fields: StructType,
41}
42
43impl StructVector {
44    pub fn try_new(fields: StructType, array: StructArray) -> Result<Self> {
45        ensure!(
46            fields.fields().len() == array.fields().len(),
47            InconsistentStructFieldsAndItemsSnafu {
48                field_len: fields.fields().len(),
49                item_len: array.fields().len(),
50            }
51        );
52        Ok(StructVector { array, fields })
53    }
54
55    pub fn array(&self) -> &StructArray {
56        &self.array
57    }
58
59    pub fn as_arrow(&self) -> &dyn Array {
60        &self.array
61    }
62
63    pub fn struct_type(&self) -> &StructType {
64        &self.fields
65    }
66}
67
68impl Vector for StructVector {
69    fn data_type(&self) -> ConcreteDataType {
70        ConcreteDataType::struct_datatype(self.fields.clone())
71    }
72
73    fn vector_type_name(&self) -> String {
74        "StructVector".to_string()
75    }
76
77    fn as_any(&self) -> &dyn std::any::Any {
78        self
79    }
80
81    fn len(&self) -> usize {
82        self.array.len()
83    }
84
85    fn to_arrow_array(&self) -> ArrayRef {
86        Arc::new(self.array.clone())
87    }
88
89    fn to_boxed_arrow_array(&self) -> Box<dyn Array> {
90        Box::new(self.array.clone())
91    }
92
93    fn validity(&self) -> Validity {
94        vectors::impl_validity_for_vector!(self.array)
95    }
96
97    fn memory_size(&self) -> usize {
98        self.array.get_buffer_memory_size()
99    }
100
101    fn null_count(&self) -> usize {
102        self.array.null_count()
103    }
104
105    fn is_null(&self, row: usize) -> bool {
106        self.array.is_null(row)
107    }
108
109    fn slice(&self, offset: usize, length: usize) -> VectorRef {
110        Arc::new(StructVector {
111            array: self.array.slice(offset, length),
112            fields: self.fields.clone(),
113        })
114    }
115
116    fn get(&self, index: usize) -> Value {
117        if !self.array.is_valid(index) {
118            return Value::Null;
119        }
120
121        let values = (0..self.fields.fields().len())
122            .map(|i| {
123                let field_array = &self.array.column(i);
124
125                if field_array.is_null(i) {
126                    Value::Null
127                } else {
128                    let scalar_value = ScalarValue::try_from_array(field_array, index).unwrap();
129                    Value::try_from(scalar_value).unwrap()
130                }
131            })
132            .collect();
133
134        Value::Struct(StructValue::try_new(values, self.fields.clone()).unwrap())
135    }
136
137    fn get_ref(&self, index: usize) -> ValueRef<'_> {
138        ValueRef::Struct(StructValueRef::Indexed {
139            vector: self,
140            idx: index,
141        })
142    }
143}
144
145impl VectorOp for StructVector {
146    fn replicate(&self, offsets: &[usize]) -> VectorRef {
147        let column_arrays = self
148            .array
149            .columns()
150            .iter()
151            .map(|col| {
152                let vector = Helper::try_into_vector(col)
153                    .expect("Failed to replicate struct vector columns");
154                vector.replicate(offsets).to_arrow_array()
155            })
156            .collect::<Vec<_>>();
157        let replicated_array = StructArray::new(
158            self.array.fields().clone(),
159            column_arrays,
160            self.array.nulls().cloned(),
161        );
162        Arc::new(StructVector::try_new(self.fields.clone(), replicated_array).unwrap())
163    }
164
165    fn cast(&self, _to_type: &ConcreteDataType) -> Result<VectorRef> {
166        UnsupportedOperationSnafu {
167            op: "cast",
168            vector_type: self.vector_type_name(),
169        }
170        .fail()
171    }
172
173    fn filter(&self, filter: &vectors::BooleanVector) -> Result<VectorRef> {
174        let filtered =
175            datafusion_common::arrow::compute::filter(&self.array, filter.as_boolean_array())
176                .context(ArrowComputeSnafu)
177                .and_then(Helper::try_into_vector)?;
178        Ok(filtered)
179    }
180
181    fn take(&self, indices: &vectors::UInt32Vector) -> Result<VectorRef> {
182        let take_result = datafusion_common::arrow::compute::take(
183            &self.array,
184            indices.as_arrow(),
185            Some(TakeOptions { check_bounds: true }),
186        )
187        .context(ArrowComputeSnafu)
188        .and_then(Helper::try_into_vector)?;
189        Ok(take_result)
190    }
191}
192
193impl Serializable for StructVector {
194    fn serialize_to_json(&self) -> Result<Vec<serde_json::Value>> {
195        let vectors = self
196            .array
197            .columns()
198            .iter()
199            .map(|value_array| Helper::try_into_vector(value_array))
200            .collect::<Result<Vec<_>>>()?;
201
202        (0..self.array.len())
203            .map(|idx| {
204                let mut result = serde_json::Map::with_capacity(vectors.len());
205                for (field, vector) in self.fields.fields().iter().zip(vectors.iter()) {
206                    let field_value = vector.get(idx);
207                    result.insert(
208                        field.name().to_string(),
209                        field_value.try_into().context(SerializeSnafu)?,
210                    );
211                }
212                Ok(result.into())
213            })
214            .collect::<Result<Vec<serde_json::Value>>>()
215    }
216}
217
218impl TryFrom<StructArray> for StructVector {
219    type Error = Error;
220
221    fn try_from(array: StructArray) -> Result<Self> {
222        let fields = match array.data_type() {
223            ArrowDataType::Struct(fields) => StructType::try_from(fields)?,
224            other => ConversionSnafu {
225                from: other.to_string(),
226            }
227            .fail()?,
228        };
229        Ok(Self { array, fields })
230    }
231}
232
233impl ScalarVector for StructVector {
234    type OwnedItem = StructValue;
235    type RefItem<'a> = StructValueRef<'a>;
236    type Iter<'a> = StructIter<'a>;
237    type Builder = StructVectorBuilder;
238
239    fn get_data(&self, idx: usize) -> Option<Self::RefItem<'_>> {
240        if self.array.is_valid(idx) {
241            Some(StructValueRef::Indexed { vector: self, idx })
242        } else {
243            None
244        }
245    }
246
247    fn iter_data(&self) -> Self::Iter<'_> {
248        StructIter::new(self)
249    }
250}
251
252pub struct StructIter<'a> {
253    vector: &'a StructVector,
254    index: usize,
255}
256
257impl<'a> StructIter<'a> {
258    pub fn new(vector: &'a StructVector) -> Self {
259        Self { vector, index: 0 }
260    }
261}
262
263impl<'a> Iterator for StructIter<'a> {
264    type Item = Option<StructValueRef<'a>>;
265
266    fn next(&mut self) -> Option<Self::Item> {
267        if self.index < self.vector.len() {
268            let idx = self.index;
269            self.index += 1;
270
271            if self.vector.is_null(idx) {
272                Some(None)
273            } else {
274                let value = StructValueRef::Indexed {
275                    vector: self.vector,
276                    idx,
277                };
278
279                Some(Some(value))
280            }
281        } else {
282            None
283        }
284    }
285
286    fn size_hint(&self) -> (usize, Option<usize>) {
287        (self.vector.len(), Some(self.vector.len()))
288    }
289}
290
291pub struct StructVectorBuilder {
292    value_builders: Vec<Box<dyn MutableVector>>,
293    null_buffer: NullBufferBuilder,
294    fields: StructType,
295}
296
297impl StructVectorBuilder {
298    pub fn with_type_and_capacity(fields: StructType, capacity: usize) -> Self {
299        let value_builders = fields
300            .fields()
301            .iter()
302            .map(|f| f.data_type().create_mutable_vector(capacity))
303            .collect();
304        Self {
305            value_builders,
306            null_buffer: NullBufferBuilder::new(capacity),
307            fields,
308        }
309    }
310
311    fn push_struct_value(&mut self, struct_value: &StructValue) -> Result<()> {
312        for (index, value) in struct_value.items().iter().enumerate() {
313            self.value_builders[index].try_push_value_ref(&value.as_value_ref())?;
314        }
315        self.null_buffer.append_non_null();
316
317        Ok(())
318    }
319
320    fn push_null_struct_value(&mut self) {
321        for builder in &mut self.value_builders {
322            builder.push_null();
323        }
324        self.null_buffer.append_null();
325    }
326}
327
328impl MutableVector for StructVectorBuilder {
329    fn data_type(&self) -> ConcreteDataType {
330        ConcreteDataType::struct_datatype(self.fields.clone())
331    }
332
333    fn len(&self) -> usize {
334        self.null_buffer.len()
335    }
336
337    fn as_any(&self) -> &dyn Any {
338        self
339    }
340
341    fn as_mut_any(&mut self) -> &mut dyn Any {
342        self
343    }
344
345    fn to_vector(&mut self) -> VectorRef {
346        Arc::new(self.finish())
347    }
348
349    fn to_vector_cloned(&self) -> VectorRef {
350        Arc::new(self.finish_cloned())
351    }
352
353    fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()> {
354        if let Some(struct_ref) = value.try_into_struct()? {
355            match struct_ref {
356                StructValueRef::Indexed { vector, idx } => match vector.get(idx).as_struct()? {
357                    Some(struct_value) => self.push_struct_value(struct_value)?,
358                    None => self.push_null(),
359                },
360                StructValueRef::Ref(val) => self.push_struct_value(val)?,
361                StructValueRef::RefList { val, fields } => {
362                    let struct_value = StructValue::try_new(
363                        val.iter().map(|v| Value::from(v.clone())).collect(),
364                        fields.clone(),
365                    )?;
366                    self.push_struct_value(&struct_value)?;
367                }
368            }
369        } else {
370            self.push_null();
371        }
372
373        Ok(())
374    }
375
376    fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> {
377        for idx in offset..offset + length {
378            let value = vector.get_ref(idx);
379            self.try_push_value_ref(&value)?;
380        }
381
382        Ok(())
383    }
384
385    fn push_null(&mut self) {
386        self.push_null_struct_value();
387    }
388}
389
390impl ScalarVectorBuilder for StructVectorBuilder {
391    type VectorType = StructVector;
392
393    fn with_capacity(_capacity: usize) -> Self {
394        panic!("Must use StructVectorBuilder::with_type_capacity()");
395    }
396
397    fn push(&mut self, value: Option<<Self::VectorType as ScalarVector>::RefItem<'_>>) {
398        self.try_push_value_ref(&value.map(ValueRef::Struct).unwrap_or(ValueRef::Null))
399            .unwrap_or_else(|e| {
400                panic!(
401                    "Failed to push value, expect value type {:?}, err:{}",
402                    self.fields, e
403                );
404            });
405    }
406
407    fn finish(&mut self) -> Self::VectorType {
408        let arrays = self
409            .value_builders
410            .iter_mut()
411            .map(|b| b.to_vector().to_arrow_array())
412            .collect();
413        let struct_array = StructArray::new(
414            self.fields.as_arrow_fields(),
415            arrays,
416            self.null_buffer.finish(),
417        );
418
419        StructVector::try_new(self.fields.clone(), struct_array).unwrap()
420    }
421
422    fn finish_cloned(&self) -> Self::VectorType {
423        let arrays = self
424            .value_builders
425            .iter()
426            .map(|b| b.to_vector_cloned().to_arrow_array())
427            .collect();
428
429        let struct_array = StructArray::new(
430            self.fields.as_arrow_fields(),
431            arrays,
432            self.null_buffer.finish_cloned(),
433        );
434        StructVector::try_new(self.fields.clone(), struct_array).unwrap()
435    }
436}
437
438#[cfg(test)]
439mod tests {
440    use super::*;
441    use crate::types::StructField;
442    use crate::value::ListValue;
443    use crate::value::tests::*;
444
445    #[test]
446    fn test_struct_vector_builder() {
447        let struct_type = build_struct_type();
448
449        let struct_values = (0..10).map(|_| build_struct_value());
450        let mut builder = StructVectorBuilder::with_type_and_capacity(struct_type.clone(), 20);
451        for value in struct_values {
452            builder.push(Some(StructValueRef::Ref(&value)));
453        }
454
455        builder.push_nulls(5);
456
457        let vector = builder.finish();
458        assert_eq!(
459            vector.data_type(),
460            ConcreteDataType::struct_datatype(struct_type.clone())
461        );
462        assert_eq!(vector.len(), 15);
463        assert_eq!(vector.null_count(), 5);
464
465        let mut null_count = 0;
466        for item in vector.iter_data() {
467            if let Some(value) = item.as_ref() {
468                assert_eq!(value.struct_type(), &struct_type);
469            } else {
470                null_count += 1;
471            }
472        }
473        assert_eq!(5, null_count);
474
475        let value = vector.get(2);
476        if let Value::Struct(struct_value) = value {
477            assert_eq!(struct_value.struct_type(), &struct_type);
478            let mut items = struct_value.items().iter();
479            assert_eq!(items.next(), Some(&Value::Int32(1)));
480            assert_eq!(items.next(), Some(&Value::String("tom".into())));
481            assert_eq!(items.next(), Some(&Value::UInt8(25)));
482            assert_eq!(items.next(), Some(&Value::String("94038".into())));
483            assert_eq!(items.next(), Some(&Value::List(build_list_value())));
484            assert_eq!(items.next(), None);
485        } else {
486            panic!("Expected a struct value");
487        }
488    }
489
490    #[test]
491    fn test_deep_nested_struct_list() {
492        // level 1: struct
493        let struct_type = ConcreteDataType::struct_datatype(build_struct_type());
494        let struct_value = build_struct_value();
495        // level 2: list
496        let struct_type_ref = Arc::new(struct_type);
497        let list_type = ConcreteDataType::list_datatype(struct_type_ref.clone());
498        let list_value = ListValue::new(
499            vec![
500                Value::Struct(struct_value.clone()),
501                Value::Struct(struct_value.clone()),
502            ],
503            struct_type_ref.clone(),
504        );
505        // level 3: struct
506        let root_type = StructType::new(Arc::new(vec![StructField::new(
507            "items".to_string(),
508            list_type,
509            false,
510        )]));
511        let root_value = StructValue::new(vec![Value::List(list_value)], root_type.clone());
512
513        let mut builder = StructVectorBuilder::with_type_and_capacity(root_type.clone(), 20);
514        builder.push(Some(StructValueRef::Ref(&root_value)));
515
516        let vector = builder.finish();
517        assert_eq!(vector.len(), 1);
518        assert_eq!(vector.null_count(), 0);
519        assert_eq!(
520            vector.data_type(),
521            ConcreteDataType::struct_datatype(root_type)
522        );
523        assert_eq!(vector.get(0), Value::Struct(root_value));
524    }
525}