datatypes/vectors/
struct_vector.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use arrow::compute::TakeOptions;
18use arrow_array::{Array, ArrayRef, StructArray};
19use serde_json::Value as JsonValue;
20use snafu::ResultExt;
21
22use crate::error::{self, ArrowComputeSnafu, Result, UnsupportedOperationSnafu};
23use crate::prelude::ConcreteDataType;
24use crate::serialize::Serializable;
25use crate::value::{Value, ValueRef};
26use crate::vectors::operations::VectorOp;
27use crate::vectors::{self, Helper, Validity, Vector, VectorRef};
28
29/// A simple wrapper around `StructArray` to represent a vector of structs in GreptimeDB.
30#[derive(Debug, PartialEq)]
31pub struct StructVector {
32    array: StructArray,
33    data_type: ConcreteDataType,
34}
35
36#[allow(unused)]
37impl StructVector {
38    pub fn new(array: StructArray) -> Result<Self> {
39        let fields = array.fields();
40        let data_type = ConcreteDataType::Struct(fields.try_into()?);
41        Ok(StructVector { array, data_type })
42    }
43
44    pub fn array(&self) -> &StructArray {
45        &self.array
46    }
47
48    pub fn as_arrow(&self) -> &dyn Array {
49        &self.array
50    }
51}
52
53impl Vector for StructVector {
54    fn data_type(&self) -> ConcreteDataType {
55        self.data_type.clone()
56    }
57
58    fn vector_type_name(&self) -> String {
59        "StructVector".to_string()
60    }
61
62    fn as_any(&self) -> &dyn std::any::Any {
63        self
64    }
65
66    fn len(&self) -> usize {
67        self.array.len()
68    }
69
70    fn to_arrow_array(&self) -> ArrayRef {
71        Arc::new(self.array.clone())
72    }
73
74    fn to_boxed_arrow_array(&self) -> Box<dyn Array> {
75        Box::new(self.array.clone())
76    }
77
78    fn validity(&self) -> Validity {
79        vectors::impl_validity_for_vector!(self.array)
80    }
81
82    fn memory_size(&self) -> usize {
83        self.array.get_buffer_memory_size()
84    }
85
86    fn null_count(&self) -> usize {
87        self.array.null_count()
88    }
89
90    fn is_null(&self, row: usize) -> bool {
91        self.array.is_null(row)
92    }
93
94    fn slice(&self, offset: usize, length: usize) -> VectorRef {
95        Arc::new(StructVector {
96            array: self.array.slice(offset, length),
97            data_type: self.data_type.clone(),
98        })
99    }
100
101    fn get(&self, _: usize) -> Value {
102        unimplemented!("StructValue not supported yet")
103    }
104
105    fn get_ref(&self, _: usize) -> ValueRef {
106        unimplemented!("StructValue not supported yet")
107    }
108}
109
110impl VectorOp for StructVector {
111    fn replicate(&self, offsets: &[usize]) -> VectorRef {
112        let column_arrays = self
113            .array
114            .columns()
115            .iter()
116            .map(|col| {
117                let vector = Helper::try_into_vector(col)
118                    .expect("Failed to replicate struct vector columns");
119                vector.replicate(offsets).to_arrow_array()
120            })
121            .collect::<Vec<_>>();
122        let replicated_array = StructArray::new(
123            self.array.fields().clone(),
124            column_arrays,
125            self.array.nulls().cloned(),
126        );
127        Arc::new(
128            StructVector::new(replicated_array).expect("Failed to create replicated StructVector"),
129        )
130    }
131
132    fn cast(&self, _to_type: &ConcreteDataType) -> Result<VectorRef> {
133        UnsupportedOperationSnafu {
134            op: "cast",
135            vector_type: self.vector_type_name(),
136        }
137        .fail()
138    }
139
140    fn filter(&self, filter: &vectors::BooleanVector) -> Result<VectorRef> {
141        let filtered =
142            datafusion_common::arrow::compute::filter(&self.array, filter.as_boolean_array())
143                .context(ArrowComputeSnafu)
144                .and_then(Helper::try_into_vector)?;
145        Ok(filtered)
146    }
147
148    fn take(&self, indices: &vectors::UInt32Vector) -> Result<VectorRef> {
149        let take_result = datafusion_common::arrow::compute::take(
150            &self.array,
151            indices.as_arrow(),
152            Some(TakeOptions { check_bounds: true }),
153        )
154        .context(ArrowComputeSnafu)
155        .and_then(Helper::try_into_vector)?;
156        Ok(take_result)
157    }
158}
159
160impl Serializable for StructVector {
161    fn serialize_to_json(&self) -> Result<Vec<serde_json::Value>> {
162        let mut result = serde_json::Map::new();
163        for (field, value) in self.array.fields().iter().zip(self.array.columns().iter()) {
164            let value_vector = Helper::try_into_vector(value)?;
165
166            let field_value = value_vector.serialize_to_json()?;
167            result.insert(field.name().clone(), JsonValue::Array(field_value));
168        }
169        let fields = JsonValue::Object(result);
170        let data_type = serde_json::to_value(&self.data_type).context(error::SerializeSnafu)?;
171        Ok(vec![JsonValue::Object(
172            [
173                ("fields".to_string(), fields),
174                ("data_type".to_string(), data_type),
175            ]
176            .iter()
177            .cloned()
178            .collect(),
179        )])
180    }
181}