mito2/memtable/
builder.rs1use std::sync::Arc;
18
19use datatypes::arrow;
20use datatypes::arrow::array::{
21 Array, ArrayDataBuilder, BufferBuilder, GenericByteArray, NullBufferBuilder, UInt8BufferBuilder,
22};
23use datatypes::arrow::buffer::Buffer;
24use datatypes::arrow_array::StringArray;
25use datatypes::data_type::DataType;
26use datatypes::prelude::{ConcreteDataType, MutableVector, VectorRef};
27use datatypes::value::ValueRef;
28use datatypes::vectors::StringVector;
29
30pub(crate) enum FieldBuilder {
32 String(StringBuilder),
33 Other(Box<dyn MutableVector>),
34}
35
36impl FieldBuilder {
37 pub fn create(data_type: &ConcreteDataType, init_cap: usize) -> Self {
39 if let ConcreteDataType::String(_) = data_type {
40 Self::String(StringBuilder::with_capacity(init_cap / 16, init_cap))
41 } else {
42 Self::Other(data_type.create_mutable_vector(init_cap))
43 }
44 }
45
46 pub(crate) fn push(&mut self, value: ValueRef) -> datatypes::error::Result<()> {
48 match self {
49 FieldBuilder::String(b) => {
50 if let Some(s) = value.as_string()? {
51 b.append(s);
52 } else {
53 b.append_null();
54 }
55 Ok(())
56 }
57 FieldBuilder::Other(b) => b.try_push_value_ref(value),
58 }
59 }
60
61 pub(crate) fn push_nulls(&mut self, n: usize) {
63 match self {
64 FieldBuilder::String(s) => {
65 s.append_n_nulls(n);
66 }
67 FieldBuilder::Other(v) => {
68 v.push_nulls(n);
69 }
70 }
71 }
72
73 pub(crate) fn finish(&mut self) -> VectorRef {
75 match self {
76 FieldBuilder::String(s) => Arc::new(StringVector::from(s.build())) as _,
77 FieldBuilder::Other(v) => v.to_vector(),
78 }
79 }
80
81 pub(crate) fn finish_cloned(&self) -> VectorRef {
83 match self {
84 FieldBuilder::String(s) => Arc::new(StringVector::from(s.build_cloned())) as _,
85 FieldBuilder::Other(v) => v.to_vector_cloned(),
86 }
87 }
88}
89
90pub(crate) struct StringBuilder {
93 value_builder: UInt8BufferBuilder,
94 offsets_builder: BufferBuilder<i32>,
95 null_buffer_builder: NullBufferBuilder,
96}
97
98impl Default for StringBuilder {
99 fn default() -> Self {
100 Self::with_capacity(16, 256)
101 }
102}
103
104impl StringBuilder {
105 pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
112 let mut offsets_builder = BufferBuilder::<i32>::new(item_capacity + 1);
113 offsets_builder.append(0);
114 Self {
115 value_builder: UInt8BufferBuilder::new(data_capacity),
116 offsets_builder,
117 null_buffer_builder: NullBufferBuilder::new(item_capacity),
118 }
119 }
120
121 pub fn append(&mut self, data: &str) {
122 self.value_builder.append_slice(data.as_bytes());
123 self.null_buffer_builder.append(true);
124 self.offsets_builder.append(self.next_offset());
125 }
126
127 #[inline]
128 pub fn next_offset(&self) -> i32 {
129 i32::try_from(self.value_builder.len()).expect("byte array offset overflow")
130 }
131
132 pub fn len(&self) -> usize {
133 self.null_buffer_builder.len()
134 }
135
136 pub fn append_array(&mut self, array: &StringArray) {
139 if array.len() == 0 {
140 return;
141 }
142
143 let offsets = array.offsets();
144
145 if self.next_offset() == offsets[0] {
148 self.offsets_builder.append_slice(&offsets[1..]);
149 } else {
150 let shift: i32 = self.next_offset() - offsets[0];
152
153 let mut intermediate = Vec::with_capacity(offsets.len() - 1);
157
158 for &offset in &offsets[1..] {
159 intermediate.push(offset + shift)
160 }
161
162 self.offsets_builder.append_slice(&intermediate);
163 }
164
165 self.value_builder.append_slice(
167 &array.values().as_slice()[offsets[0] as usize..offsets[array.len()] as usize],
168 );
169
170 if let Some(null_buffer) = array.nulls() {
171 let data: Vec<_> = null_buffer.inner().iter().collect();
172 self.null_buffer_builder.append_slice(&data);
173 } else {
174 self.null_buffer_builder.append_n_non_nulls(array.len());
175 }
176 }
177
178 pub fn append_null(&mut self) {
179 self.null_buffer_builder.append(false);
180 self.offsets_builder.append(self.next_offset());
181 }
182
183 pub fn append_n_nulls(&mut self, n: usize) {
184 self.null_buffer_builder.append_n_nulls(n);
185 self.offsets_builder.append_n(n, self.next_offset());
186 }
187
188 pub fn build(&mut self) -> StringArray {
189 let array_builder = ArrayDataBuilder::new(arrow::datatypes::DataType::Utf8)
190 .len(self.len())
191 .add_buffer(self.offsets_builder.finish())
192 .add_buffer(self.value_builder.finish())
193 .nulls(self.null_buffer_builder.finish());
194
195 self.offsets_builder.append(self.next_offset());
196 let array_data = unsafe { array_builder.build_unchecked() };
197 GenericByteArray::from(array_data)
198 }
199
200 pub fn build_cloned(&self) -> StringArray {
201 let offset_buffer = Buffer::from_slice_ref(self.offsets_builder.as_slice());
202 let value_buffer = Buffer::from_slice_ref(self.value_builder.as_slice());
203 let array_builder = ArrayDataBuilder::new(arrow::datatypes::DataType::Utf8)
204 .len(self.len())
205 .add_buffer(offset_buffer)
206 .add_buffer(value_buffer)
207 .nulls(self.null_buffer_builder.finish_cloned());
208 let array_data = unsafe { array_builder.build_unchecked() };
209 GenericByteArray::from(array_data)
210 }
211}
212
213#[cfg(test)]
214mod tests {
215 use datatypes::arrow::array::StringArray;
216
217 use super::*;
218
219 #[test]
220 fn test_append() {
221 let mut builder = StringBuilder::default();
222 builder.append_n_nulls(10);
223 let array = builder.build();
224 assert_eq!(vec![None; 10], array.iter().collect::<Vec<_>>());
225
226 let mut builder = StringBuilder::default();
227 builder.append_n_nulls(3);
228 builder.append("hello");
229 builder.append_null();
230 builder.append("world");
231 assert_eq!(
232 vec![None, None, None, Some("hello"), None, Some("world")],
233 builder.build().iter().collect::<Vec<_>>()
234 )
235 }
236
237 #[test]
238 fn test_append_empty_string() {
239 let mut builder = StringBuilder::default();
240 builder.append("");
241 builder.append_null();
242 builder.append("");
243 let array = builder.build();
244 assert_eq!(
245 vec![Some(""), None, Some("")],
246 array.iter().collect::<Vec<_>>()
247 );
248 }
249
250 #[test]
251 fn test_append_large_string() {
252 let large_str = "a".repeat(1024);
253 let mut builder = StringBuilder::default();
254 builder.append(&large_str);
255 let array = builder.build();
256 assert_eq!(large_str.as_str(), array.value(0));
257 }
258
259 #[test]
260 fn test_append_array() {
261 let mut builder_1 = StringBuilder::default();
262 builder_1.append("hello");
263 builder_1.append_null();
264 builder_1.append("world");
265
266 let mut builder_2 = StringBuilder::default();
267 builder_2.append_null();
268 builder_2.append("!");
269 builder_2.append_array(&builder_1.build());
270 assert_eq!(
271 vec![None, Some("!"), Some("hello"), None, Some("world")],
272 builder_2.build().iter().collect::<Vec<_>>()
273 )
274 }
275
276 #[test]
277 fn test_append_empty_array() {
278 let mut builder = StringBuilder::default();
279 builder.append_array(&StringArray::from(vec![] as Vec<&str>));
280 let array = builder.build();
281 assert_eq!(0, array.len());
282 }
283
284 #[test]
285 fn test_append_partial_array() {
286 let source = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]);
287 let sliced = source.slice(1, 2); let mut builder = StringBuilder::default();
290 builder.append_array(&sliced);
291 let array = builder.build();
292 assert_eq!(vec![None, Some("b")], array.iter().collect::<Vec<_>>());
293 }
294
295 #[test]
296 fn test_builder_capacity() {
297 let mut builder = StringBuilder::with_capacity(10, 100);
298 assert_eq!(0, builder.len());
299
300 for i in 0..10 {
301 builder.append(&format!("string-{}", i));
302 }
303
304 let array = builder.build();
305 assert_eq!(10, array.len());
306 assert_eq!("string-0", array.value(0));
307 assert_eq!("string-9", array.value(9));
308 }
309
310 #[test]
311 fn test_builder_reset_after_build() {
312 let mut builder = StringBuilder::default();
313 builder.append("first");
314 let array1 = builder.build();
315 assert_eq!(1, array1.len());
316
317 builder.append("second");
318 let array2 = builder.build();
319 assert_eq!(1, array2.len()); }
321}