common_recordbatch/
recordbatch.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::slice;
17use std::sync::Arc;
18
19use datafusion::arrow::util::pretty::pretty_format_batches;
20use datafusion_common::arrow::array::ArrayRef;
21use datafusion_common::arrow::compute;
22use datafusion_common::arrow::datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef};
23use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions, StructArray, new_null_array};
24use datatypes::extension::json::is_json_extension_type;
25use datatypes::prelude::DataType;
26use datatypes::schema::SchemaRef;
27use datatypes::vectors::{Helper, VectorRef};
28use serde::ser::{Error, SerializeStruct};
29use serde::{Serialize, Serializer};
30use snafu::{OptionExt, ResultExt, ensure};
31
32use crate::DfRecordBatch;
33use crate::error::{
34    self, AlignJsonArraySnafu, ArrowComputeSnafu, ColumnNotExistsSnafu, DataTypesSnafu,
35    NewDfRecordBatchSnafu, ProjectArrowRecordBatchSnafu, Result,
36};
37
38/// A two-dimensional batch of column-oriented data with a defined schema.
39#[derive(Clone, Debug, PartialEq)]
40pub struct RecordBatch {
41    pub schema: SchemaRef,
42    df_record_batch: DfRecordBatch,
43}
44
45impl RecordBatch {
46    /// Create a new [`RecordBatch`] from `schema` and `columns`.
47    pub fn new<I: IntoIterator<Item = VectorRef>>(
48        schema: SchemaRef,
49        columns: I,
50    ) -> Result<RecordBatch> {
51        let columns: Vec<_> = columns.into_iter().collect();
52        let arrow_arrays = columns.iter().map(|v| v.to_arrow_array()).collect();
53
54        // Casting the arrays here to match the schema, is a temporary solution to support Arrow's
55        // view array types (`StringViewArray` and `BinaryViewArray`).
56        // As to "support": the arrays here are created from vectors, which do not have types
57        // corresponding to view arrays. What we can do is to only cast them.
58        // As to "temporary": we are planing to use Arrow's RecordBatch directly in the read path.
59        // the casting here will be removed in the end.
60        // TODO(LFC): Remove the casting here once `Batch` is no longer used.
61        let arrow_arrays = Self::cast_view_arrays(schema.arrow_schema(), arrow_arrays)?;
62
63        let arrow_arrays = maybe_align_json_array_with_schema(schema.arrow_schema(), arrow_arrays)?;
64
65        let df_record_batch = DfRecordBatch::try_new(schema.arrow_schema().clone(), arrow_arrays)
66            .context(error::NewDfRecordBatchSnafu)?;
67
68        Ok(RecordBatch {
69            schema,
70            df_record_batch,
71        })
72    }
73
74    fn cast_view_arrays(
75        schema: &ArrowSchemaRef,
76        mut arrays: Vec<ArrayRef>,
77    ) -> Result<Vec<ArrayRef>> {
78        for (f, a) in schema.fields().iter().zip(arrays.iter_mut()) {
79            let expected = f.data_type();
80            let actual = a.data_type();
81            if matches!(
82                (expected, actual),
83                (ArrowDataType::Utf8View, ArrowDataType::Utf8)
84                    | (ArrowDataType::BinaryView, ArrowDataType::Binary)
85            ) {
86                *a = compute::cast(a, expected).context(ArrowComputeSnafu)?;
87            }
88        }
89        Ok(arrays)
90    }
91
92    /// Create an empty [`RecordBatch`] from `schema`.
93    pub fn new_empty(schema: SchemaRef) -> RecordBatch {
94        let df_record_batch = DfRecordBatch::new_empty(schema.arrow_schema().clone());
95        RecordBatch {
96            schema,
97            df_record_batch,
98        }
99    }
100
101    /// Create an empty [`RecordBatch`] from `schema` with `num_rows`.
102    pub fn new_with_count(schema: SchemaRef, num_rows: usize) -> Result<Self> {
103        let df_record_batch = DfRecordBatch::try_new_with_options(
104            schema.arrow_schema().clone(),
105            vec![],
106            &RecordBatchOptions::new().with_row_count(Some(num_rows)),
107        )
108        .context(error::NewDfRecordBatchSnafu)?;
109        Ok(RecordBatch {
110            schema,
111            df_record_batch,
112        })
113    }
114
115    pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
116        let schema = Arc::new(self.schema.try_project(indices).context(DataTypesSnafu)?);
117        let df_record_batch = self.df_record_batch.project(indices).with_context(|_| {
118            ProjectArrowRecordBatchSnafu {
119                schema: self.schema.clone(),
120                projection: indices.to_vec(),
121            }
122        })?;
123
124        Ok(Self {
125            schema,
126            df_record_batch,
127        })
128    }
129
130    /// Create a new [`RecordBatch`] from `schema` and `df_record_batch`.
131    ///
132    /// This method doesn't check the schema.
133    pub fn from_df_record_batch(schema: SchemaRef, df_record_batch: DfRecordBatch) -> RecordBatch {
134        RecordBatch {
135            schema,
136            df_record_batch,
137        }
138    }
139
140    #[inline]
141    pub fn df_record_batch(&self) -> &DfRecordBatch {
142        &self.df_record_batch
143    }
144
145    #[inline]
146    pub fn into_df_record_batch(self) -> DfRecordBatch {
147        self.df_record_batch
148    }
149
150    #[inline]
151    pub fn columns(&self) -> &[ArrayRef] {
152        self.df_record_batch.columns()
153    }
154
155    #[inline]
156    pub fn column(&self, idx: usize) -> &ArrayRef {
157        self.df_record_batch.column(idx)
158    }
159
160    pub fn column_by_name(&self, name: &str) -> Option<&ArrayRef> {
161        self.df_record_batch.column_by_name(name)
162    }
163
164    #[inline]
165    pub fn num_columns(&self) -> usize {
166        self.df_record_batch.num_columns()
167    }
168
169    #[inline]
170    pub fn num_rows(&self) -> usize {
171        self.df_record_batch.num_rows()
172    }
173
174    pub fn column_vectors(
175        &self,
176        table_name: &str,
177        table_schema: SchemaRef,
178    ) -> Result<HashMap<String, VectorRef>> {
179        let mut vectors = HashMap::with_capacity(self.num_columns());
180
181        // column schemas in recordbatch must match its vectors, otherwise it's corrupted
182        for (field, array) in self
183            .df_record_batch
184            .schema()
185            .fields()
186            .iter()
187            .zip(self.df_record_batch.columns().iter())
188        {
189            let column_name = field.name();
190            let column_schema =
191                table_schema
192                    .column_schema_by_name(column_name)
193                    .context(ColumnNotExistsSnafu {
194                        table_name,
195                        column_name,
196                    })?;
197            let vector = if field.data_type() != &column_schema.data_type.as_arrow_type() {
198                let array = compute::cast(array, &column_schema.data_type.as_arrow_type())
199                    .context(ArrowComputeSnafu)?;
200                Helper::try_into_vector(array).context(DataTypesSnafu)?
201            } else {
202                Helper::try_into_vector(array).context(DataTypesSnafu)?
203            };
204
205            let _ = vectors.insert(column_name.clone(), vector);
206        }
207
208        Ok(vectors)
209    }
210
211    /// Pretty display this record batch like a table
212    pub fn pretty_print(&self) -> String {
213        pretty_format_batches(slice::from_ref(&self.df_record_batch))
214            .map(|t| t.to_string())
215            .unwrap_or("failed to pretty display a record batch".to_string())
216    }
217
218    /// Return a slice record batch starts from offset, with len rows
219    pub fn slice(&self, offset: usize, len: usize) -> Result<RecordBatch> {
220        ensure!(
221            offset + len <= self.num_rows(),
222            error::RecordBatchSliceIndexOverflowSnafu {
223                size: self.num_rows(),
224                visit_index: offset + len
225            }
226        );
227        let sliced = self.df_record_batch.slice(offset, len);
228        Ok(RecordBatch::from_df_record_batch(
229            self.schema.clone(),
230            sliced,
231        ))
232    }
233
234    /// Returns the total number of bytes of memory pointed to by the arrays in this `RecordBatch`.
235    ///
236    /// The buffers store bytes in the Arrow memory format, and include the data as well as the validity map.
237    /// Note that this does not always correspond to the exact memory usage of an array,
238    /// since multiple arrays can share the same buffers or slices thereof.
239    pub fn buffer_memory_size(&self) -> usize {
240        self.df_record_batch
241            .columns()
242            .iter()
243            .map(|array| array.get_buffer_memory_size())
244            .sum()
245    }
246
247    /// Iterate the values as strings in the column at index `i`.
248    ///
249    /// Note that if the underlying array is not a valid GreptimeDB vector, an empty iterator is
250    /// returned.
251    ///
252    /// # Panics
253    /// if index `i` is out of bound.
254    pub fn iter_column_as_string(&self, i: usize) -> Box<dyn Iterator<Item = Option<String>> + '_> {
255        macro_rules! iter {
256            ($column: ident) => {
257                Box::new(
258                    (0..$column.len())
259                        .map(|i| $column.is_valid(i).then(|| $column.value(i).to_string())),
260                )
261            };
262        }
263
264        let column = self.df_record_batch.column(i);
265        match column.data_type() {
266            ArrowDataType::Utf8 => {
267                let column = column.as_string::<i32>();
268                let iter = iter!(column);
269                iter as _
270            }
271            ArrowDataType::LargeUtf8 => {
272                let column = column.as_string::<i64>();
273                iter!(column)
274            }
275            ArrowDataType::Utf8View => {
276                let column = column.as_string_view();
277                iter!(column)
278            }
279            _ => {
280                if let Ok(column) = Helper::try_into_vector(column) {
281                    Box::new(
282                        (0..column.len())
283                            .map(move |i| (!column.is_null(i)).then(|| column.get(i).to_string())),
284                    )
285                } else {
286                    Box::new(std::iter::empty())
287                }
288            }
289        }
290    }
291}
292
293impl Serialize for RecordBatch {
294    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
295    where
296        S: Serializer,
297    {
298        // TODO(yingwen): arrow and arrow2's schemas have different fields, so
299        // it might be better to use our `RawSchema` as serialized field.
300        let mut s = serializer.serialize_struct("record", 2)?;
301        s.serialize_field("schema", &**self.schema.arrow_schema())?;
302
303        let columns = self.df_record_batch.columns();
304        let columns = Helper::try_into_vectors(columns).map_err(Error::custom)?;
305        let vec = columns
306            .iter()
307            .map(|c| c.serialize_to_json())
308            .collect::<std::result::Result<Vec<_>, _>>()
309            .map_err(S::Error::custom)?;
310
311        s.serialize_field("columns", &vec)?;
312        s.end()
313    }
314}
315
316/// merge multiple recordbatch into a single
317pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Result<RecordBatch> {
318    let batches_len = batches.len();
319    if batches_len == 0 {
320        return Ok(RecordBatch::new_empty(schema));
321    }
322
323    let record_batch = compute::concat_batches(
324        schema.arrow_schema(),
325        batches.iter().map(|x| x.df_record_batch()),
326    )
327    .context(ArrowComputeSnafu)?;
328
329    // Create a new RecordBatch with merged columns
330    Ok(RecordBatch::from_df_record_batch(schema, record_batch))
331}
332
333/// Align a json array `json_array` to the json type `schema_type`. The `schema_type` is often the
334/// "largest" json type after some insertions in the table schema, while the json array previously
335/// written in the SST could be lagged behind it. So it's important to "amend" the json array's
336/// missing fields with null arrays, to align the array's data type with the provided one.
337///
338/// # Panics
339///
340/// - The json array is not an Arrow [StructArray], or the provided data type `schema_type` is not
341///   of Struct type. Both of which shouldn't happen unless we switch our implementation of how
342///   json array is physically stored.
343pub fn align_json_array(json_array: &ArrayRef, schema_type: &ArrowDataType) -> Result<ArrayRef> {
344    let json_type = json_array.data_type();
345    if json_type == schema_type {
346        return Ok(json_array.clone());
347    }
348
349    let json_array = json_array.as_struct();
350    let array_fields = json_array.fields();
351    let array_columns = json_array.columns();
352    let ArrowDataType::Struct(schema_fields) = schema_type else {
353        unreachable!()
354    };
355    let mut aligned = Vec::with_capacity(schema_fields.len());
356
357    // Compare the fields in the json array and the to-be-aligned schema, amending with null arrays
358    // on the way. It's very important to note that fields in the json array and in the json type
359    // are both SORTED.
360
361    let mut i = 0; // point to the schema fields
362    let mut j = 0; // point to the array fields
363    while i < schema_fields.len() && j < array_fields.len() {
364        let schema_field = &schema_fields[i];
365        let array_field = &array_fields[j];
366        if schema_field.name() == array_field.name() {
367            if matches!(schema_field.data_type(), ArrowDataType::Struct(_)) {
368                // A `StructArray`s in a json array must be another json array. (Like a nested json
369                // object in a json value.)
370                aligned.push(align_json_array(
371                    &array_columns[j],
372                    schema_field.data_type(),
373                )?);
374            } else {
375                aligned.push(array_columns[j].clone());
376            }
377            j += 1;
378        } else {
379            aligned.push(new_null_array(schema_field.data_type(), json_array.len()));
380        }
381        i += 1;
382    }
383    if i < schema_fields.len() {
384        for field in &schema_fields[i..] {
385            aligned.push(new_null_array(field.data_type(), json_array.len()));
386        }
387    }
388    ensure!(
389        j == array_fields.len(),
390        AlignJsonArraySnafu {
391            reason: format!(
392                "this json array has more fields {:?}",
393                array_fields[j..]
394                    .iter()
395                    .map(|x| x.name())
396                    .collect::<Vec<_>>(),
397            )
398        }
399    );
400
401    let json_array =
402        StructArray::try_new(schema_fields.clone(), aligned, json_array.nulls().cloned())
403            .context(NewDfRecordBatchSnafu)?;
404    Ok(Arc::new(json_array))
405}
406
407fn maybe_align_json_array_with_schema(
408    schema: &ArrowSchemaRef,
409    arrays: Vec<ArrayRef>,
410) -> Result<Vec<ArrayRef>> {
411    if schema.fields().iter().all(|f| !is_json_extension_type(f)) {
412        return Ok(arrays);
413    }
414
415    let mut aligned = Vec::with_capacity(arrays.len());
416    for (field, array) in schema.fields().iter().zip(arrays.into_iter()) {
417        if !is_json_extension_type(field) {
418            aligned.push(array);
419            continue;
420        }
421
422        let json_array = align_json_array(&array, field.data_type())?;
423        aligned.push(json_array);
424    }
425    Ok(aligned)
426}
427
428#[cfg(test)]
429mod tests {
430    use std::sync::Arc;
431
432    use datatypes::arrow::array::{
433        AsArray, BooleanArray, Float64Array, Int64Array, ListArray, UInt32Array,
434    };
435    use datatypes::arrow::datatypes::{
436        DataType, Field, Fields, Int64Type, Schema as ArrowSchema, UInt32Type,
437    };
438    use datatypes::arrow_array::StringArray;
439    use datatypes::data_type::ConcreteDataType;
440    use datatypes::schema::{ColumnSchema, Schema};
441    use datatypes::vectors::{StringVector, UInt32Vector};
442
443    use super::*;
444
445    #[test]
446    fn test_align_json_array() -> Result<()> {
447        struct TestCase {
448            json_array: ArrayRef,
449            schema_type: DataType,
450            expected: std::result::Result<ArrayRef, String>,
451        }
452
453        impl TestCase {
454            fn new(
455                json_array: StructArray,
456                schema_type: Fields,
457                expected: std::result::Result<Vec<ArrayRef>, String>,
458            ) -> Self {
459                Self {
460                    json_array: Arc::new(json_array),
461                    schema_type: DataType::Struct(schema_type.clone()),
462                    expected: expected
463                        .map(|x| Arc::new(StructArray::new(schema_type, x, None)) as ArrayRef),
464                }
465            }
466
467            fn test(self) -> Result<()> {
468                let result = align_json_array(&self.json_array, &self.schema_type);
469                match (result, self.expected) {
470                    (Ok(json_array), Ok(expected)) => assert_eq!(&json_array, &expected),
471                    (Ok(json_array), Err(e)) => {
472                        panic!("expecting error {e} but actually get: {json_array:?}")
473                    }
474                    (Err(e), Err(expected)) => assert_eq!(e.to_string(), expected),
475                    (Err(e), Ok(_)) => return Err(e),
476                }
477                Ok(())
478            }
479        }
480
481        // Test empty json array can be aligned with a complex json type.
482        TestCase::new(
483            StructArray::new_empty_fields(2, None),
484            Fields::from(vec![
485                Field::new("int", DataType::Int64, true),
486                Field::new_struct(
487                    "nested",
488                    vec![Field::new("bool", DataType::Boolean, true)],
489                    true,
490                ),
491                Field::new("string", DataType::Utf8, true),
492            ]),
493            Ok(vec![
494                Arc::new(Int64Array::new_null(2)) as ArrayRef,
495                Arc::new(StructArray::new_null(
496                    Fields::from(vec![Arc::new(Field::new("bool", DataType::Boolean, true))]),
497                    2,
498                )),
499                Arc::new(StringArray::new_null(2)),
500            ]),
501        )
502        .test()?;
503
504        // Test simple json array alignment.
505        TestCase::new(
506            StructArray::from(vec![(
507                Arc::new(Field::new("float", DataType::Float64, true)),
508                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
509            )]),
510            Fields::from(vec![
511                Field::new("float", DataType::Float64, true),
512                Field::new("string", DataType::Utf8, true),
513            ]),
514            Ok(vec![
515                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
516                Arc::new(StringArray::new_null(3)),
517            ]),
518        )
519        .test()?;
520
521        // Test complex json array alignment.
522        TestCase::new(
523            StructArray::from(vec![
524                (
525                    Arc::new(Field::new_list(
526                        "list",
527                        Field::new_list_field(DataType::Int64, true),
528                        true,
529                    )),
530                    Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
531                        Some(vec![Some(1)]),
532                        None,
533                        Some(vec![Some(2), Some(3)]),
534                    ])) as ArrayRef,
535                ),
536                (
537                    Arc::new(Field::new_struct(
538                        "nested",
539                        vec![Field::new("int", DataType::Int64, true)],
540                        true,
541                    )),
542                    Arc::new(StructArray::from(vec![(
543                        Arc::new(Field::new("int", DataType::Int64, true)),
544                        Arc::new(Int64Array::from(vec![-1, -2, -3])) as ArrayRef,
545                    )])),
546                ),
547                (
548                    Arc::new(Field::new("string", DataType::Utf8, true)),
549                    Arc::new(StringArray::from(vec!["a", "b", "c"])),
550                ),
551            ]),
552            Fields::from(vec![
553                Field::new("bool", DataType::Boolean, true),
554                Field::new_list("list", Field::new_list_field(DataType::Int64, true), true),
555                Field::new_struct(
556                    "nested",
557                    vec![
558                        Field::new("float", DataType::Float64, true),
559                        Field::new("int", DataType::Int64, true),
560                    ],
561                    true,
562                ),
563                Field::new("string", DataType::Utf8, true),
564            ]),
565            Ok(vec![
566                Arc::new(BooleanArray::new_null(3)) as ArrayRef,
567                Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
568                    Some(vec![Some(1)]),
569                    None,
570                    Some(vec![Some(2), Some(3)]),
571                ])),
572                Arc::new(StructArray::from(vec![
573                    (
574                        Arc::new(Field::new("float", DataType::Float64, true)),
575                        Arc::new(Float64Array::new_null(3)) as ArrayRef,
576                    ),
577                    (
578                        Arc::new(Field::new("int", DataType::Int64, true)),
579                        Arc::new(Int64Array::from(vec![-1, -2, -3])),
580                    ),
581                ])),
582                Arc::new(StringArray::from(vec!["a", "b", "c"])),
583            ]),
584        )
585        .test()?;
586
587        // Test align failed.
588        TestCase::new(
589            StructArray::try_from(vec![
590                ("i", Arc::new(Int64Array::from(vec![1])) as ArrayRef),
591                ("j", Arc::new(Int64Array::from(vec![2])) as ArrayRef),
592            ])
593            .unwrap(),
594            Fields::from(vec![Field::new("i", DataType::Int64, true)]),
595            Err(
596                r#"Failed to align JSON array, reason: this json array has more fields ["j"]"#
597                    .to_string(),
598            ),
599        )
600        .test()?;
601        Ok(())
602    }
603
604    #[test]
605    fn test_record_batch() {
606        let arrow_schema = Arc::new(ArrowSchema::new(vec![
607            Field::new("c1", DataType::UInt32, false),
608            Field::new("c2", DataType::UInt32, false),
609        ]));
610        let schema = Arc::new(Schema::try_from(arrow_schema).unwrap());
611
612        let c1 = Arc::new(UInt32Vector::from_slice([1, 2, 3]));
613        let c2 = Arc::new(UInt32Vector::from_slice([4, 5, 6]));
614        let columns: Vec<VectorRef> = vec![c1, c2];
615
616        let expected = vec![
617            Arc::new(UInt32Array::from_iter_values([1, 2, 3])) as ArrayRef,
618            Arc::new(UInt32Array::from_iter_values([4, 5, 6])),
619        ];
620
621        let batch = RecordBatch::new(schema.clone(), columns.clone()).unwrap();
622        assert_eq!(3, batch.num_rows());
623        assert_eq!(expected, batch.df_record_batch().columns());
624        assert_eq!(schema, batch.schema);
625
626        assert_eq!(&expected[0], batch.column_by_name("c1").unwrap());
627        assert_eq!(&expected[1], batch.column_by_name("c2").unwrap());
628        assert!(batch.column_by_name("c3").is_none());
629
630        let converted = RecordBatch::from_df_record_batch(schema, batch.df_record_batch().clone());
631        assert_eq!(batch, converted);
632        assert_eq!(*batch.df_record_batch(), converted.into_df_record_batch());
633    }
634
635    #[test]
636    pub fn test_serialize_recordbatch() {
637        let column_schemas = vec![ColumnSchema::new(
638            "number",
639            ConcreteDataType::uint32_datatype(),
640            false,
641        )];
642        let schema = Arc::new(Schema::try_new(column_schemas).unwrap());
643
644        let numbers: Vec<u32> = (0..10).collect();
645        let columns = vec![Arc::new(UInt32Vector::from_slice(numbers)) as VectorRef];
646        let batch = RecordBatch::new(schema, columns).unwrap();
647
648        let output = serde_json::to_string(&batch).unwrap();
649        assert_eq!(
650            r#"{"schema":{"fields":[{"name":"number","data_type":"UInt32","nullable":false,"dict_id":0,"dict_is_ordered":false,"metadata":{}}],"metadata":{"greptime:version":"0"}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}"#,
651            output
652        );
653    }
654
655    #[test]
656    fn test_record_batch_slice() {
657        let column_schemas = vec![
658            ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
659            ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
660        ];
661        let schema = Arc::new(Schema::new(column_schemas));
662        let columns: Vec<VectorRef> = vec![
663            Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
664            Arc::new(StringVector::from(vec![
665                None,
666                Some("hello"),
667                Some("greptime"),
668                None,
669            ])),
670        ];
671        let recordbatch = RecordBatch::new(schema, columns).unwrap();
672        let recordbatch = recordbatch.slice(1, 2).expect("recordbatch slice");
673
674        let expected = &UInt32Array::from_iter_values([2u32, 3]);
675        let array = recordbatch.column(0);
676        let actual = array.as_primitive::<UInt32Type>();
677        assert_eq!(expected, actual);
678
679        let expected = &StringArray::from(vec!["hello", "greptime"]);
680        let array = recordbatch.column(1);
681        let actual = array.as_string::<i32>();
682        assert_eq!(expected, actual);
683
684        assert!(recordbatch.slice(1, 5).is_err());
685    }
686
687    #[test]
688    fn test_merge_record_batch() {
689        let column_schemas = vec![
690            ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
691            ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
692        ];
693        let schema = Arc::new(Schema::new(column_schemas));
694        let columns: Vec<VectorRef> = vec![
695            Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
696            Arc::new(StringVector::from(vec![
697                None,
698                Some("hello"),
699                Some("greptime"),
700                None,
701            ])),
702        ];
703        let recordbatch = RecordBatch::new(schema.clone(), columns).unwrap();
704
705        let columns: Vec<VectorRef> = vec![
706            Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
707            Arc::new(StringVector::from(vec![
708                None,
709                Some("hello"),
710                Some("greptime"),
711                None,
712            ])),
713        ];
714        let recordbatch2 = RecordBatch::new(schema.clone(), columns).unwrap();
715
716        let merged = merge_record_batches(schema.clone(), &[recordbatch, recordbatch2])
717            .expect("merge recordbatch");
718        assert_eq!(merged.num_rows(), 8);
719    }
720}