mito2/memtable/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Builders for time-series memtable
16
17use std::sync::Arc;
18
19use datatypes::arrow;
20use datatypes::arrow::array::{
21    Array, ArrayDataBuilder, BufferBuilder, GenericByteArray, NullBufferBuilder, UInt8BufferBuilder,
22};
23use datatypes::arrow_array::StringArray;
24use datatypes::data_type::DataType;
25use datatypes::prelude::{ConcreteDataType, MutableVector, VectorRef};
26use datatypes::value::ValueRef;
27use datatypes::vectors::StringVector;
28
29/// Field builder with special implementation for strings.
30pub(crate) enum FieldBuilder {
31    String(StringBuilder),
32    Other(Box<dyn MutableVector>),
33}
34
35impl FieldBuilder {
36    /// Creates a [FieldBuilder] instance with given type and capacity.
37    pub fn create(data_type: &ConcreteDataType, init_cap: usize) -> Self {
38        if let ConcreteDataType::String(_) = data_type {
39            Self::String(StringBuilder::with_capacity(init_cap / 16, init_cap))
40        } else {
41            Self::Other(data_type.create_mutable_vector(init_cap))
42        }
43    }
44
45    /// Pushes a value into builder.
46    pub(crate) fn push(&mut self, value: ValueRef) -> datatypes::error::Result<()> {
47        match self {
48            FieldBuilder::String(b) => {
49                if let Some(s) = value.as_string()? {
50                    b.append(s);
51                } else {
52                    b.append_null();
53                }
54                Ok(())
55            }
56            FieldBuilder::Other(b) => b.try_push_value_ref(value),
57        }
58    }
59
60    /// Push n null values into builder.
61    pub(crate) fn push_nulls(&mut self, n: usize) {
62        match self {
63            FieldBuilder::String(s) => {
64                s.append_n_nulls(n);
65            }
66            FieldBuilder::Other(v) => {
67                v.push_nulls(n);
68            }
69        }
70    }
71
72    /// Finishes builder and builder a [VectorRef].
73    pub(crate) fn finish(&mut self) -> VectorRef {
74        match self {
75            FieldBuilder::String(s) => Arc::new(StringVector::from(s.build())) as _,
76            FieldBuilder::Other(v) => v.to_vector(),
77        }
78    }
79}
80
81/// [StringBuilder] serves as a workaround for lacking [`GenericStringBuilder::append_array`](https://docs.rs/arrow-array/latest/arrow_array/builder/type.GenericStringBuilder.html#method.append_array)
82/// which is only available since arrow-rs 55.0.0.
83pub(crate) struct StringBuilder {
84    value_builder: UInt8BufferBuilder,
85    offsets_builder: BufferBuilder<i32>,
86    null_buffer_builder: NullBufferBuilder,
87}
88
89impl Default for StringBuilder {
90    fn default() -> Self {
91        Self::with_capacity(16, 256)
92    }
93}
94
95impl StringBuilder {
96    /// Creates a new [`GenericByteBuilder`].
97    ///
98    /// - `item_capacity` is the number of items to pre-allocate.
99    ///   The size of the preallocated buffer of offsets is the number of items plus one.
100    /// - `data_capacity` is the total number of bytes of data to pre-allocate
101    ///   (for all items, not per item).
102    pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
103        let mut offsets_builder = BufferBuilder::<i32>::new(item_capacity + 1);
104        offsets_builder.append(0);
105        Self {
106            value_builder: UInt8BufferBuilder::new(data_capacity),
107            offsets_builder,
108            null_buffer_builder: NullBufferBuilder::new(item_capacity),
109        }
110    }
111
112    pub fn append(&mut self, data: &str) {
113        self.value_builder.append_slice(data.as_bytes());
114        self.null_buffer_builder.append(true);
115        self.offsets_builder.append(self.next_offset());
116    }
117
118    #[inline]
119    fn next_offset(&self) -> i32 {
120        i32::try_from(self.value_builder.len()).expect("byte array offset overflow")
121    }
122
123    pub fn len(&self) -> usize {
124        self.null_buffer_builder.len()
125    }
126
127    /// Based on arrow-rs' GenericByteBuilder:
128    /// https://github.com/apache/arrow-rs/blob/7905545537c50590fdb4dc645e3e0130fce80b57/arrow-array/src/builder/generic_bytes_builder.rs#L135
129    pub fn append_array(&mut self, array: &StringArray) {
130        if array.len() == 0 {
131            return;
132        }
133
134        let offsets = array.offsets();
135
136        // If the offsets are contiguous, we can append them directly avoiding the need to align
137        // for example, when the first appended array is not sliced (starts at offset 0)
138        if self.next_offset() == offsets[0] {
139            self.offsets_builder.append_slice(&offsets[1..]);
140        } else {
141            // Shifting all the offsets
142            let shift: i32 = self.next_offset() - offsets[0];
143
144            // Creating intermediate offsets instead of pushing each offset is faster
145            // (even if we make MutableBuffer to avoid updating length on each push
146            //  and reserve the necessary capacity, it's still slower)
147            let mut intermediate = Vec::with_capacity(offsets.len() - 1);
148
149            for &offset in &offsets[1..] {
150                intermediate.push(offset + shift)
151            }
152
153            self.offsets_builder.append_slice(&intermediate);
154        }
155
156        // Append underlying values, starting from the first offset and ending at the last offset
157        self.value_builder.append_slice(
158            &array.values().as_slice()[offsets[0] as usize..offsets[array.len()] as usize],
159        );
160
161        if let Some(null_buffer) = array.nulls() {
162            let data: Vec<_> = null_buffer.inner().iter().collect();
163            self.null_buffer_builder.append_slice(&data);
164        } else {
165            self.null_buffer_builder.append_n_non_nulls(array.len());
166        }
167    }
168
169    pub fn append_null(&mut self) {
170        self.null_buffer_builder.append(false);
171        self.offsets_builder.append(self.next_offset());
172    }
173
174    pub fn append_n_nulls(&mut self, n: usize) {
175        self.null_buffer_builder.append_n_nulls(n);
176        self.offsets_builder.append_n(n, self.next_offset());
177    }
178
179    pub fn build(&mut self) -> StringArray {
180        let array_builder = ArrayDataBuilder::new(arrow::datatypes::DataType::Utf8)
181            .len(self.len())
182            .add_buffer(self.offsets_builder.finish())
183            .add_buffer(self.value_builder.finish())
184            .nulls(self.null_buffer_builder.finish());
185
186        self.offsets_builder.append(self.next_offset());
187        let array_data = unsafe { array_builder.build_unchecked() };
188        GenericByteArray::from(array_data)
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use datatypes::arrow::array::StringArray;
195
196    use super::*;
197
198    #[test]
199    fn test_append() {
200        let mut builder = StringBuilder::default();
201        builder.append_n_nulls(10);
202        let array = builder.build();
203        assert_eq!(vec![None; 10], array.iter().collect::<Vec<_>>());
204
205        let mut builder = StringBuilder::default();
206        builder.append_n_nulls(3);
207        builder.append("hello");
208        builder.append_null();
209        builder.append("world");
210        assert_eq!(
211            vec![None, None, None, Some("hello"), None, Some("world")],
212            builder.build().iter().collect::<Vec<_>>()
213        )
214    }
215
216    #[test]
217    fn test_append_empty_string() {
218        let mut builder = StringBuilder::default();
219        builder.append("");
220        builder.append_null();
221        builder.append("");
222        let array = builder.build();
223        assert_eq!(
224            vec![Some(""), None, Some("")],
225            array.iter().collect::<Vec<_>>()
226        );
227    }
228
229    #[test]
230    fn test_append_large_string() {
231        let large_str = "a".repeat(1024);
232        let mut builder = StringBuilder::default();
233        builder.append(&large_str);
234        let array = builder.build();
235        assert_eq!(large_str.as_str(), array.value(0));
236    }
237
238    #[test]
239    fn test_append_array() {
240        let mut builder_1 = StringBuilder::default();
241        builder_1.append("hello");
242        builder_1.append_null();
243        builder_1.append("world");
244
245        let mut builder_2 = StringBuilder::default();
246        builder_2.append_null();
247        builder_2.append("!");
248        builder_2.append_array(&builder_1.build());
249        assert_eq!(
250            vec![None, Some("!"), Some("hello"), None, Some("world")],
251            builder_2.build().iter().collect::<Vec<_>>()
252        )
253    }
254
255    #[test]
256    fn test_append_empty_array() {
257        let mut builder = StringBuilder::default();
258        builder.append_array(&StringArray::from(vec![] as Vec<&str>));
259        let array = builder.build();
260        assert_eq!(0, array.len());
261    }
262
263    #[test]
264    fn test_append_partial_array() {
265        let source = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]);
266        let sliced = source.slice(1, 2); // [None, Some("b")]
267
268        let mut builder = StringBuilder::default();
269        builder.append_array(&sliced);
270        let array = builder.build();
271        assert_eq!(vec![None, Some("b")], array.iter().collect::<Vec<_>>());
272    }
273
274    #[test]
275    fn test_builder_capacity() {
276        let mut builder = StringBuilder::with_capacity(10, 100);
277        assert_eq!(0, builder.len());
278
279        for i in 0..10 {
280            builder.append(&format!("string-{}", i));
281        }
282
283        let array = builder.build();
284        assert_eq!(10, array.len());
285        assert_eq!("string-0", array.value(0));
286        assert_eq!("string-9", array.value(9));
287    }
288
289    #[test]
290    fn test_builder_reset_after_build() {
291        let mut builder = StringBuilder::default();
292        builder.append("first");
293        let array1 = builder.build();
294        assert_eq!(1, array1.len());
295
296        builder.append("second");
297        let array2 = builder.build();
298        assert_eq!(1, array2.len()); // Not 2 because build() doesn't reset
299    }
300}