Skip to main content

common_recordbatch/
recordbatch.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::slice;
17use std::sync::Arc;
18
19use datafusion::arrow::util::pretty::pretty_format_batches;
20use datafusion_common::arrow::array::ArrayRef;
21use datafusion_common::arrow::compute;
22use datafusion_common::arrow::datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef};
23use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions};
24use datatypes::extension::json::is_json_extension_type;
25use datatypes::prelude::DataType;
26use datatypes::schema::SchemaRef;
27use datatypes::vectors::json::array::JsonArray;
28use datatypes::vectors::{Helper, VectorRef};
29use serde::ser::{Error, SerializeStruct};
30use serde::{Serialize, Serializer};
31use snafu::{OptionExt, ResultExt, ensure};
32
33use crate::DfRecordBatch;
34use crate::error::{
35    self, ArrowComputeSnafu, ColumnNotExistsSnafu, DataTypesSnafu, ProjectArrowRecordBatchSnafu,
36    Result,
37};
38
39/// A two-dimensional batch of column-oriented data with a defined schema.
40#[derive(Clone, Debug, PartialEq)]
41pub struct RecordBatch {
42    pub schema: SchemaRef,
43    df_record_batch: DfRecordBatch,
44}
45
46impl RecordBatch {
47    /// Create a new [`RecordBatch`] from `schema` and `columns`.
48    pub fn new<I: IntoIterator<Item = VectorRef>>(
49        schema: SchemaRef,
50        columns: I,
51    ) -> Result<RecordBatch> {
52        let columns: Vec<_> = columns.into_iter().collect();
53        let arrow_arrays = columns.iter().map(|v| v.to_arrow_array()).collect();
54
55        // Casting the arrays here to match the schema, is a temporary solution to support Arrow's
56        // view array types (`StringViewArray` and `BinaryViewArray`).
57        // As to "support": the arrays here are created from vectors, which do not have types
58        // corresponding to view arrays. What we can do is to only cast them.
59        // As to "temporary": we are planing to use Arrow's RecordBatch directly in the read path.
60        // the casting here will be removed in the end.
61        // TODO(LFC): Remove the casting here once `Batch` is no longer used.
62        let arrow_arrays = Self::cast_view_arrays(schema.arrow_schema(), arrow_arrays)?;
63
64        let arrow_arrays = maybe_align_json_array_with_schema(schema.arrow_schema(), arrow_arrays)?;
65
66        let df_record_batch = DfRecordBatch::try_new(schema.arrow_schema().clone(), arrow_arrays)
67            .context(error::NewDfRecordBatchSnafu)?;
68
69        Ok(RecordBatch {
70            schema,
71            df_record_batch,
72        })
73    }
74
75    pub fn to_df_record_batch<I: IntoIterator<Item = VectorRef>>(
76        arrow_schema: ArrowSchemaRef,
77        columns: I,
78    ) -> Result<DfRecordBatch> {
79        let columns: Vec<_> = columns.into_iter().collect();
80        let arrow_arrays = columns.iter().map(|v| v.to_arrow_array()).collect();
81
82        // Casting the arrays here to match the schema, is a temporary solution to support Arrow's
83        // view array types (`StringViewArray` and `BinaryViewArray`).
84        // As to "support": the arrays here are created from vectors, which do not have types
85        // corresponding to view arrays. What we can do is to only cast them.
86        // As to "temporary": we are planing to use Arrow's RecordBatch directly in the read path.
87        // the casting here will be removed in the end.
88        // TODO(LFC): Remove the casting here once `Batch` is no longer used.
89        let arrow_arrays = Self::cast_view_arrays(&arrow_schema, arrow_arrays)?;
90
91        let arrow_arrays = maybe_align_json_array_with_schema(&arrow_schema, arrow_arrays)?;
92
93        let df_record_batch = DfRecordBatch::try_new(arrow_schema, arrow_arrays)
94            .context(error::NewDfRecordBatchSnafu)?;
95
96        Ok(df_record_batch)
97    }
98
99    fn cast_view_arrays(
100        schema: &ArrowSchemaRef,
101        mut arrays: Vec<ArrayRef>,
102    ) -> Result<Vec<ArrayRef>> {
103        for (f, a) in schema.fields().iter().zip(arrays.iter_mut()) {
104            let expected = f.data_type();
105            let actual = a.data_type();
106            if matches!(
107                (expected, actual),
108                (ArrowDataType::Utf8View, ArrowDataType::Utf8)
109                    | (ArrowDataType::BinaryView, ArrowDataType::Binary)
110            ) {
111                *a = compute::cast(a, expected).context(ArrowComputeSnafu)?;
112            }
113        }
114        Ok(arrays)
115    }
116
117    /// Create an empty [`RecordBatch`] from `schema`.
118    pub fn new_empty(schema: SchemaRef) -> RecordBatch {
119        let df_record_batch = DfRecordBatch::new_empty(schema.arrow_schema().clone());
120        RecordBatch {
121            schema,
122            df_record_batch,
123        }
124    }
125
126    /// Create an empty [`RecordBatch`] from `schema` with `num_rows`.
127    pub fn new_with_count(schema: SchemaRef, num_rows: usize) -> Result<Self> {
128        let df_record_batch = DfRecordBatch::try_new_with_options(
129            schema.arrow_schema().clone(),
130            vec![],
131            &RecordBatchOptions::new().with_row_count(Some(num_rows)),
132        )
133        .context(error::NewDfRecordBatchSnafu)?;
134        Ok(RecordBatch {
135            schema,
136            df_record_batch,
137        })
138    }
139
140    pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
141        let schema = Arc::new(self.schema.try_project(indices).context(DataTypesSnafu)?);
142        let df_record_batch = self.df_record_batch.project(indices).with_context(|_| {
143            ProjectArrowRecordBatchSnafu {
144                schema: self.schema.clone(),
145                projection: indices.to_vec(),
146            }
147        })?;
148
149        Ok(Self {
150            schema,
151            df_record_batch,
152        })
153    }
154
155    /// Create a new [`RecordBatch`] from `schema` and `df_record_batch`.
156    ///
157    /// This method doesn't check the schema.
158    pub fn from_df_record_batch(schema: SchemaRef, df_record_batch: DfRecordBatch) -> RecordBatch {
159        RecordBatch {
160            schema,
161            df_record_batch,
162        }
163    }
164
165    #[inline]
166    pub fn df_record_batch(&self) -> &DfRecordBatch {
167        &self.df_record_batch
168    }
169
170    #[inline]
171    pub fn into_df_record_batch(self) -> DfRecordBatch {
172        self.df_record_batch
173    }
174
175    #[inline]
176    pub fn columns(&self) -> &[ArrayRef] {
177        self.df_record_batch.columns()
178    }
179
180    #[inline]
181    pub fn column(&self, idx: usize) -> &ArrayRef {
182        self.df_record_batch.column(idx)
183    }
184
185    pub fn column_by_name(&self, name: &str) -> Option<&ArrayRef> {
186        self.df_record_batch.column_by_name(name)
187    }
188
189    #[inline]
190    pub fn num_columns(&self) -> usize {
191        self.df_record_batch.num_columns()
192    }
193
194    #[inline]
195    pub fn num_rows(&self) -> usize {
196        self.df_record_batch.num_rows()
197    }
198
199    pub fn column_vectors(
200        &self,
201        table_name: &str,
202        table_schema: SchemaRef,
203    ) -> Result<HashMap<String, VectorRef>> {
204        let mut vectors = HashMap::with_capacity(self.num_columns());
205
206        // column schemas in recordbatch must match its vectors, otherwise it's corrupted
207        for (field, array) in self
208            .df_record_batch
209            .schema()
210            .fields()
211            .iter()
212            .zip(self.df_record_batch.columns().iter())
213        {
214            let column_name = field.name();
215            let column_schema =
216                table_schema
217                    .column_schema_by_name(column_name)
218                    .context(ColumnNotExistsSnafu {
219                        table_name,
220                        column_name,
221                    })?;
222            let vector = if field.data_type() != &column_schema.data_type.as_arrow_type() {
223                let array = compute::cast(array, &column_schema.data_type.as_arrow_type())
224                    .context(ArrowComputeSnafu)?;
225                Helper::try_into_vector(array).context(DataTypesSnafu)?
226            } else {
227                Helper::try_into_vector(array).context(DataTypesSnafu)?
228            };
229
230            let _ = vectors.insert(column_name.clone(), vector);
231        }
232
233        Ok(vectors)
234    }
235
236    /// Pretty display this record batch like a table
237    pub fn pretty_print(&self) -> String {
238        pretty_format_batches(slice::from_ref(&self.df_record_batch))
239            .map(|t| t.to_string())
240            .unwrap_or("failed to pretty display a record batch".to_string())
241    }
242
243    /// Return a slice record batch starts from offset, with len rows
244    pub fn slice(&self, offset: usize, len: usize) -> Result<RecordBatch> {
245        ensure!(
246            offset + len <= self.num_rows(),
247            error::RecordBatchSliceIndexOverflowSnafu {
248                size: self.num_rows(),
249                visit_index: offset + len
250            }
251        );
252        let sliced = self.df_record_batch.slice(offset, len);
253        Ok(RecordBatch::from_df_record_batch(
254            self.schema.clone(),
255            sliced,
256        ))
257    }
258
259    /// Returns the total number of bytes of memory pointed to by the arrays in this `RecordBatch`.
260    ///
261    /// The buffers store bytes in the Arrow memory format, and include the data as well as the validity map.
262    /// Note that this does not always correspond to the exact memory usage of an array,
263    /// since multiple arrays can share the same buffers or slices thereof.
264    pub fn buffer_memory_size(&self) -> usize {
265        self.df_record_batch
266            .columns()
267            .iter()
268            .map(|array| array.get_buffer_memory_size())
269            .sum()
270    }
271
272    /// Iterate the values as strings in the column at index `i`.
273    ///
274    /// Note that if the underlying array is not a valid GreptimeDB vector, an empty iterator is
275    /// returned.
276    ///
277    /// # Panics
278    /// if index `i` is out of bound.
279    pub fn iter_column_as_string(&self, i: usize) -> Box<dyn Iterator<Item = Option<String>> + '_> {
280        macro_rules! iter {
281            ($column: ident) => {
282                Box::new(
283                    (0..$column.len())
284                        .map(|i| $column.is_valid(i).then(|| $column.value(i).to_string())),
285                )
286            };
287        }
288
289        let column = self.df_record_batch.column(i);
290        match column.data_type() {
291            ArrowDataType::Utf8 => {
292                let column = column.as_string::<i32>();
293                let iter = iter!(column);
294                iter as _
295            }
296            ArrowDataType::LargeUtf8 => {
297                let column = column.as_string::<i64>();
298                iter!(column)
299            }
300            ArrowDataType::Utf8View => {
301                let column = column.as_string_view();
302                iter!(column)
303            }
304            _ => {
305                if let Ok(column) = Helper::try_into_vector(column) {
306                    Box::new(
307                        (0..column.len())
308                            .map(move |i| (!column.is_null(i)).then(|| column.get(i).to_string())),
309                    )
310                } else {
311                    Box::new(std::iter::empty())
312                }
313            }
314        }
315    }
316}
317
318impl Serialize for RecordBatch {
319    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
320    where
321        S: Serializer,
322    {
323        // TODO(yingwen): arrow and arrow2's schemas have different fields, so
324        // it might be better to use our `RawSchema` as serialized field.
325        let mut s = serializer.serialize_struct("record", 2)?;
326        s.serialize_field("schema", &**self.schema.arrow_schema())?;
327
328        let columns = self.df_record_batch.columns();
329        let columns = Helper::try_into_vectors(columns).map_err(Error::custom)?;
330        let vec = columns
331            .iter()
332            .map(|c| c.serialize_to_json())
333            .collect::<std::result::Result<Vec<_>, _>>()
334            .map_err(S::Error::custom)?;
335
336        s.serialize_field("columns", &vec)?;
337        s.end()
338    }
339}
340
341/// merge multiple recordbatch into a single
342pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Result<RecordBatch> {
343    let batches_len = batches.len();
344    if batches_len == 0 {
345        return Ok(RecordBatch::new_empty(schema));
346    }
347
348    let record_batch = compute::concat_batches(
349        schema.arrow_schema(),
350        batches.iter().map(|x| x.df_record_batch()),
351    )
352    .context(ArrowComputeSnafu)?;
353
354    // Create a new RecordBatch with merged columns
355    Ok(RecordBatch::from_df_record_batch(schema, record_batch))
356}
357
358fn maybe_align_json_array_with_schema(
359    schema: &ArrowSchemaRef,
360    arrays: Vec<ArrayRef>,
361) -> Result<Vec<ArrayRef>> {
362    if schema.fields().iter().all(|f| !is_json_extension_type(f)) {
363        return Ok(arrays);
364    }
365
366    let mut aligned = Vec::with_capacity(arrays.len());
367    for (field, array) in schema.fields().iter().zip(arrays) {
368        if !is_json_extension_type(field) {
369            aligned.push(array);
370            continue;
371        }
372
373        let json_array = JsonArray::from(&array)
374            .try_align(field.data_type())
375            .context(DataTypesSnafu)?;
376        aligned.push(json_array);
377    }
378    Ok(aligned)
379}
380
381#[cfg(test)]
382mod tests {
383    use std::sync::Arc;
384
385    use datatypes::arrow::array::{AsArray, UInt32Array};
386    use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, UInt32Type};
387    use datatypes::arrow_array::StringArray;
388    use datatypes::data_type::ConcreteDataType;
389    use datatypes::schema::{ColumnSchema, Schema};
390    use datatypes::vectors::{StringVector, UInt32Vector};
391
392    use super::*;
393
394    #[test]
395    fn test_record_batch() {
396        let arrow_schema = Arc::new(ArrowSchema::new(vec![
397            Field::new("c1", DataType::UInt32, false),
398            Field::new("c2", DataType::UInt32, false),
399        ]));
400        let schema = Arc::new(Schema::try_from(arrow_schema).unwrap());
401
402        let c1 = Arc::new(UInt32Vector::from_slice([1, 2, 3]));
403        let c2 = Arc::new(UInt32Vector::from_slice([4, 5, 6]));
404        let columns: Vec<VectorRef> = vec![c1, c2];
405
406        let expected = vec![
407            Arc::new(UInt32Array::from_iter_values([1, 2, 3])) as ArrayRef,
408            Arc::new(UInt32Array::from_iter_values([4, 5, 6])),
409        ];
410
411        let batch = RecordBatch::new(schema.clone(), columns.clone()).unwrap();
412        assert_eq!(3, batch.num_rows());
413        assert_eq!(expected, batch.df_record_batch().columns());
414        assert_eq!(schema, batch.schema);
415
416        assert_eq!(&expected[0], batch.column_by_name("c1").unwrap());
417        assert_eq!(&expected[1], batch.column_by_name("c2").unwrap());
418        assert!(batch.column_by_name("c3").is_none());
419
420        let converted = RecordBatch::from_df_record_batch(schema, batch.df_record_batch().clone());
421        assert_eq!(batch, converted);
422        assert_eq!(*batch.df_record_batch(), converted.into_df_record_batch());
423    }
424
425    #[test]
426    pub fn test_serialize_recordbatch() {
427        let column_schemas = vec![ColumnSchema::new(
428            "number",
429            ConcreteDataType::uint32_datatype(),
430            false,
431        )];
432        let schema = Arc::new(Schema::try_new(column_schemas).unwrap());
433
434        let numbers: Vec<u32> = (0..10).collect();
435        let columns = vec![Arc::new(UInt32Vector::from_slice(numbers)) as VectorRef];
436        let batch = RecordBatch::new(schema, columns).unwrap();
437
438        let output = serde_json::to_string(&batch).unwrap();
439        assert_eq!(
440            r#"{"schema":{"fields":[{"name":"number","data_type":"UInt32","nullable":false,"dict_id":0,"dict_is_ordered":false,"metadata":{}}],"metadata":{"greptime:version":"0"}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}"#,
441            output
442        );
443    }
444
445    #[test]
446    fn test_record_batch_slice() {
447        let column_schemas = vec![
448            ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
449            ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
450        ];
451        let schema = Arc::new(Schema::new(column_schemas));
452        let columns: Vec<VectorRef> = vec![
453            Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
454            Arc::new(StringVector::from(vec![
455                None,
456                Some("hello"),
457                Some("greptime"),
458                None,
459            ])),
460        ];
461        let recordbatch = RecordBatch::new(schema, columns).unwrap();
462        let recordbatch = recordbatch.slice(1, 2).expect("recordbatch slice");
463
464        let expected = &UInt32Array::from_iter_values([2u32, 3]);
465        let array = recordbatch.column(0);
466        let actual = array.as_primitive::<UInt32Type>();
467        assert_eq!(expected, actual);
468
469        let expected = &StringArray::from(vec!["hello", "greptime"]);
470        let array = recordbatch.column(1);
471        let actual = array.as_string::<i32>();
472        assert_eq!(expected, actual);
473
474        assert!(recordbatch.slice(1, 5).is_err());
475    }
476
477    #[test]
478    fn test_merge_record_batch() {
479        let column_schemas = vec![
480            ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
481            ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
482        ];
483        let schema = Arc::new(Schema::new(column_schemas));
484        let columns: Vec<VectorRef> = vec![
485            Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
486            Arc::new(StringVector::from(vec![
487                None,
488                Some("hello"),
489                Some("greptime"),
490                None,
491            ])),
492        ];
493        let recordbatch = RecordBatch::new(schema.clone(), columns).unwrap();
494
495        let columns: Vec<VectorRef> = vec![
496            Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
497            Arc::new(StringVector::from(vec![
498                None,
499                Some("hello"),
500                Some("greptime"),
501                None,
502            ])),
503        ];
504        let recordbatch2 = RecordBatch::new(schema.clone(), columns).unwrap();
505
506        let merged = merge_record_batches(schema.clone(), &[recordbatch, recordbatch2])
507            .expect("merge recordbatch");
508        assert_eq!(merged.num_rows(), 8);
509    }
510}