mito2/memtable/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Builders for time-series memtable
16
17use std::sync::Arc;
18
19use datatypes::arrow;
20use datatypes::arrow::array::{
21    Array, ArrayDataBuilder, BufferBuilder, GenericByteArray, NullBufferBuilder, UInt8BufferBuilder,
22};
23use datatypes::arrow::buffer::Buffer;
24use datatypes::arrow_array::StringArray;
25use datatypes::data_type::DataType;
26use datatypes::prelude::{ConcreteDataType, MutableVector, VectorRef};
27use datatypes::value::ValueRef;
28use datatypes::vectors::StringVector;
29
30/// Field builder with special implementation for strings.
31pub(crate) enum FieldBuilder {
32    String(StringBuilder),
33    Other(Box<dyn MutableVector>),
34}
35
36impl FieldBuilder {
37    /// Creates a [FieldBuilder] instance with given type and capacity.
38    pub fn create(data_type: &ConcreteDataType, init_cap: usize) -> Self {
39        if let ConcreteDataType::String(_) = data_type {
40            Self::String(StringBuilder::with_capacity(init_cap / 16, init_cap))
41        } else {
42            Self::Other(data_type.create_mutable_vector(init_cap))
43        }
44    }
45
46    /// Pushes a value into builder.
47    pub(crate) fn push(&mut self, value: ValueRef) -> datatypes::error::Result<()> {
48        match self {
49            FieldBuilder::String(b) => {
50                if let Some(s) = value.as_string()? {
51                    b.append(s);
52                } else {
53                    b.append_null();
54                }
55                Ok(())
56            }
57            FieldBuilder::Other(b) => b.try_push_value_ref(value),
58        }
59    }
60
61    /// Push n null values into builder.
62    pub(crate) fn push_nulls(&mut self, n: usize) {
63        match self {
64            FieldBuilder::String(s) => {
65                s.append_n_nulls(n);
66            }
67            FieldBuilder::Other(v) => {
68                v.push_nulls(n);
69            }
70        }
71    }
72
73    /// Finishes builder and builder a [VectorRef].
74    pub(crate) fn finish(&mut self) -> VectorRef {
75        match self {
76            FieldBuilder::String(s) => Arc::new(StringVector::from(s.build())) as _,
77            FieldBuilder::Other(v) => v.to_vector(),
78        }
79    }
80
81    /// Finishes builder and builder a [VectorRef].
82    pub(crate) fn finish_cloned(&self) -> VectorRef {
83        match self {
84            FieldBuilder::String(s) => Arc::new(StringVector::from(s.build_cloned())) as _,
85            FieldBuilder::Other(v) => v.to_vector_cloned(),
86        }
87    }
88}
89
90/// [StringBuilder] serves as a workaround for lacking [`GenericStringBuilder::append_array`](https://docs.rs/arrow-array/latest/arrow_array/builder/type.GenericStringBuilder.html#method.append_array)
91/// which is only available since arrow-rs 55.0.0.
92pub(crate) struct StringBuilder {
93    value_builder: UInt8BufferBuilder,
94    offsets_builder: BufferBuilder<i32>,
95    null_buffer_builder: NullBufferBuilder,
96}
97
98impl Default for StringBuilder {
99    fn default() -> Self {
100        Self::with_capacity(16, 256)
101    }
102}
103
104impl StringBuilder {
105    /// Creates a new [`GenericByteBuilder`].
106    ///
107    /// - `item_capacity` is the number of items to pre-allocate.
108    ///   The size of the preallocated buffer of offsets is the number of items plus one.
109    /// - `data_capacity` is the total number of bytes of data to pre-allocate
110    ///   (for all items, not per item).
111    pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
112        let mut offsets_builder = BufferBuilder::<i32>::new(item_capacity + 1);
113        offsets_builder.append(0);
114        Self {
115            value_builder: UInt8BufferBuilder::new(data_capacity),
116            offsets_builder,
117            null_buffer_builder: NullBufferBuilder::new(item_capacity),
118        }
119    }
120
121    pub fn append(&mut self, data: &str) {
122        self.value_builder.append_slice(data.as_bytes());
123        self.null_buffer_builder.append(true);
124        self.offsets_builder.append(self.next_offset());
125    }
126
127    #[inline]
128    pub fn next_offset(&self) -> i32 {
129        i32::try_from(self.value_builder.len()).expect("byte array offset overflow")
130    }
131
132    pub fn len(&self) -> usize {
133        self.null_buffer_builder.len()
134    }
135
136    /// Based on arrow-rs' GenericByteBuilder:
137    /// https://github.com/apache/arrow-rs/blob/7905545537c50590fdb4dc645e3e0130fce80b57/arrow-array/src/builder/generic_bytes_builder.rs#L135
138    pub fn append_array(&mut self, array: &StringArray) {
139        if array.len() == 0 {
140            return;
141        }
142
143        let offsets = array.offsets();
144
145        // If the offsets are contiguous, we can append them directly avoiding the need to align
146        // for example, when the first appended array is not sliced (starts at offset 0)
147        if self.next_offset() == offsets[0] {
148            self.offsets_builder.append_slice(&offsets[1..]);
149        } else {
150            // Shifting all the offsets
151            let shift: i32 = self.next_offset() - offsets[0];
152
153            // Creating intermediate offsets instead of pushing each offset is faster
154            // (even if we make MutableBuffer to avoid updating length on each push
155            //  and reserve the necessary capacity, it's still slower)
156            let mut intermediate = Vec::with_capacity(offsets.len() - 1);
157
158            for &offset in &offsets[1..] {
159                intermediate.push(offset + shift)
160            }
161
162            self.offsets_builder.append_slice(&intermediate);
163        }
164
165        // Append underlying values, starting from the first offset and ending at the last offset
166        self.value_builder.append_slice(
167            &array.values().as_slice()[offsets[0] as usize..offsets[array.len()] as usize],
168        );
169
170        if let Some(null_buffer) = array.nulls() {
171            let data: Vec<_> = null_buffer.inner().iter().collect();
172            self.null_buffer_builder.append_slice(&data);
173        } else {
174            self.null_buffer_builder.append_n_non_nulls(array.len());
175        }
176    }
177
178    pub fn append_null(&mut self) {
179        self.null_buffer_builder.append(false);
180        self.offsets_builder.append(self.next_offset());
181    }
182
183    pub fn append_n_nulls(&mut self, n: usize) {
184        self.null_buffer_builder.append_n_nulls(n);
185        self.offsets_builder.append_n(n, self.next_offset());
186    }
187
188    pub fn build(&mut self) -> StringArray {
189        let array_builder = ArrayDataBuilder::new(arrow::datatypes::DataType::Utf8)
190            .len(self.len())
191            .add_buffer(self.offsets_builder.finish())
192            .add_buffer(self.value_builder.finish())
193            .nulls(self.null_buffer_builder.finish());
194
195        self.offsets_builder.append(self.next_offset());
196        let array_data = unsafe { array_builder.build_unchecked() };
197        GenericByteArray::from(array_data)
198    }
199
200    pub fn build_cloned(&self) -> StringArray {
201        let offset_buffer = Buffer::from_slice_ref(self.offsets_builder.as_slice());
202        let value_buffer = Buffer::from_slice_ref(self.value_builder.as_slice());
203        let array_builder = ArrayDataBuilder::new(arrow::datatypes::DataType::Utf8)
204            .len(self.len())
205            .add_buffer(offset_buffer)
206            .add_buffer(value_buffer)
207            .nulls(self.null_buffer_builder.finish_cloned());
208        let array_data = unsafe { array_builder.build_unchecked() };
209        GenericByteArray::from(array_data)
210    }
211}
212
213#[cfg(test)]
214mod tests {
215    use datatypes::arrow::array::StringArray;
216
217    use super::*;
218
219    #[test]
220    fn test_append() {
221        let mut builder = StringBuilder::default();
222        builder.append_n_nulls(10);
223        let array = builder.build();
224        assert_eq!(vec![None; 10], array.iter().collect::<Vec<_>>());
225
226        let mut builder = StringBuilder::default();
227        builder.append_n_nulls(3);
228        builder.append("hello");
229        builder.append_null();
230        builder.append("world");
231        assert_eq!(
232            vec![None, None, None, Some("hello"), None, Some("world")],
233            builder.build().iter().collect::<Vec<_>>()
234        )
235    }
236
237    #[test]
238    fn test_append_empty_string() {
239        let mut builder = StringBuilder::default();
240        builder.append("");
241        builder.append_null();
242        builder.append("");
243        let array = builder.build();
244        assert_eq!(
245            vec![Some(""), None, Some("")],
246            array.iter().collect::<Vec<_>>()
247        );
248    }
249
250    #[test]
251    fn test_append_large_string() {
252        let large_str = "a".repeat(1024);
253        let mut builder = StringBuilder::default();
254        builder.append(&large_str);
255        let array = builder.build();
256        assert_eq!(large_str.as_str(), array.value(0));
257    }
258
259    #[test]
260    fn test_append_array() {
261        let mut builder_1 = StringBuilder::default();
262        builder_1.append("hello");
263        builder_1.append_null();
264        builder_1.append("world");
265
266        let mut builder_2 = StringBuilder::default();
267        builder_2.append_null();
268        builder_2.append("!");
269        builder_2.append_array(&builder_1.build());
270        assert_eq!(
271            vec![None, Some("!"), Some("hello"), None, Some("world")],
272            builder_2.build().iter().collect::<Vec<_>>()
273        )
274    }
275
276    #[test]
277    fn test_append_empty_array() {
278        let mut builder = StringBuilder::default();
279        builder.append_array(&StringArray::from(vec![] as Vec<&str>));
280        let array = builder.build();
281        assert_eq!(0, array.len());
282    }
283
284    #[test]
285    fn test_append_partial_array() {
286        let source = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]);
287        let sliced = source.slice(1, 2); // [None, Some("b")]
288
289        let mut builder = StringBuilder::default();
290        builder.append_array(&sliced);
291        let array = builder.build();
292        assert_eq!(vec![None, Some("b")], array.iter().collect::<Vec<_>>());
293    }
294
295    #[test]
296    fn test_builder_capacity() {
297        let mut builder = StringBuilder::with_capacity(10, 100);
298        assert_eq!(0, builder.len());
299
300        for i in 0..10 {
301            builder.append(&format!("string-{}", i));
302        }
303
304        let array = builder.build();
305        assert_eq!(10, array.len());
306        assert_eq!("string-0", array.value(0));
307        assert_eq!("string-9", array.value(9));
308    }
309
310    #[test]
311    fn test_builder_reset_after_build() {
312        let mut builder = StringBuilder::default();
313        builder.append("first");
314        let array1 = builder.build();
315        assert_eq!(1, array1.len());
316
317        builder.append("second");
318        let array2 = builder.build();
319        assert_eq!(1, array2.len()); // Not 2 because build() doesn't reset
320    }
321}