common_recordbatch/
recordbatch.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::slice;
17use std::sync::Arc;
18
19use datafusion::arrow::util::pretty::pretty_format_batches;
20use datafusion_common::arrow::array::ArrayRef;
21use datafusion_common::arrow::compute;
22use datafusion_common::arrow::datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef};
23use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions, StructArray, new_null_array};
24use datatypes::extension::json::is_json_extension_type;
25use datatypes::prelude::DataType;
26use datatypes::schema::SchemaRef;
27use datatypes::vectors::{Helper, VectorRef};
28use serde::ser::{Error, SerializeStruct};
29use serde::{Serialize, Serializer};
30use snafu::{OptionExt, ResultExt, ensure};
31
32use crate::DfRecordBatch;
33use crate::error::{
34    self, AlignJsonArraySnafu, ArrowComputeSnafu, ColumnNotExistsSnafu, DataTypesSnafu,
35    NewDfRecordBatchSnafu, ProjectArrowRecordBatchSnafu, Result,
36};
37
38/// A two-dimensional batch of column-oriented data with a defined schema.
39#[derive(Clone, Debug, PartialEq)]
40pub struct RecordBatch {
41    pub schema: SchemaRef,
42    df_record_batch: DfRecordBatch,
43}
44
45impl RecordBatch {
46    /// Create a new [`RecordBatch`] from `schema` and `columns`.
47    pub fn new<I: IntoIterator<Item = VectorRef>>(
48        schema: SchemaRef,
49        columns: I,
50    ) -> Result<RecordBatch> {
51        let columns: Vec<_> = columns.into_iter().collect();
52        let arrow_arrays = columns.iter().map(|v| v.to_arrow_array()).collect();
53
54        // Casting the arrays here to match the schema, is a temporary solution to support Arrow's
55        // view array types (`StringViewArray` and `BinaryViewArray`).
56        // As to "support": the arrays here are created from vectors, which do not have types
57        // corresponding to view arrays. What we can do is to only cast them.
58        // As to "temporary": we are planing to use Arrow's RecordBatch directly in the read path.
59        // the casting here will be removed in the end.
60        // TODO(LFC): Remove the casting here once `Batch` is no longer used.
61        let arrow_arrays = Self::cast_view_arrays(schema.arrow_schema(), arrow_arrays)?;
62
63        let arrow_arrays = maybe_align_json_array_with_schema(schema.arrow_schema(), arrow_arrays)?;
64
65        let df_record_batch = DfRecordBatch::try_new(schema.arrow_schema().clone(), arrow_arrays)
66            .context(error::NewDfRecordBatchSnafu)?;
67
68        Ok(RecordBatch {
69            schema,
70            df_record_batch,
71        })
72    }
73
74    pub fn to_df_record_batch<I: IntoIterator<Item = VectorRef>>(
75        arrow_schema: ArrowSchemaRef,
76        columns: I,
77    ) -> Result<DfRecordBatch> {
78        let columns: Vec<_> = columns.into_iter().collect();
79        let arrow_arrays = columns.iter().map(|v| v.to_arrow_array()).collect();
80
81        // Casting the arrays here to match the schema, is a temporary solution to support Arrow's
82        // view array types (`StringViewArray` and `BinaryViewArray`).
83        // As to "support": the arrays here are created from vectors, which do not have types
84        // corresponding to view arrays. What we can do is to only cast them.
85        // As to "temporary": we are planing to use Arrow's RecordBatch directly in the read path.
86        // the casting here will be removed in the end.
87        // TODO(LFC): Remove the casting here once `Batch` is no longer used.
88        let arrow_arrays = Self::cast_view_arrays(&arrow_schema, arrow_arrays)?;
89
90        let arrow_arrays = maybe_align_json_array_with_schema(&arrow_schema, arrow_arrays)?;
91
92        let df_record_batch = DfRecordBatch::try_new(arrow_schema, arrow_arrays)
93            .context(error::NewDfRecordBatchSnafu)?;
94
95        Ok(df_record_batch)
96    }
97
98    fn cast_view_arrays(
99        schema: &ArrowSchemaRef,
100        mut arrays: Vec<ArrayRef>,
101    ) -> Result<Vec<ArrayRef>> {
102        for (f, a) in schema.fields().iter().zip(arrays.iter_mut()) {
103            let expected = f.data_type();
104            let actual = a.data_type();
105            if matches!(
106                (expected, actual),
107                (ArrowDataType::Utf8View, ArrowDataType::Utf8)
108                    | (ArrowDataType::BinaryView, ArrowDataType::Binary)
109            ) {
110                *a = compute::cast(a, expected).context(ArrowComputeSnafu)?;
111            }
112        }
113        Ok(arrays)
114    }
115
116    /// Create an empty [`RecordBatch`] from `schema`.
117    pub fn new_empty(schema: SchemaRef) -> RecordBatch {
118        let df_record_batch = DfRecordBatch::new_empty(schema.arrow_schema().clone());
119        RecordBatch {
120            schema,
121            df_record_batch,
122        }
123    }
124
125    /// Create an empty [`RecordBatch`] from `schema` with `num_rows`.
126    pub fn new_with_count(schema: SchemaRef, num_rows: usize) -> Result<Self> {
127        let df_record_batch = DfRecordBatch::try_new_with_options(
128            schema.arrow_schema().clone(),
129            vec![],
130            &RecordBatchOptions::new().with_row_count(Some(num_rows)),
131        )
132        .context(error::NewDfRecordBatchSnafu)?;
133        Ok(RecordBatch {
134            schema,
135            df_record_batch,
136        })
137    }
138
139    pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
140        let schema = Arc::new(self.schema.try_project(indices).context(DataTypesSnafu)?);
141        let df_record_batch = self.df_record_batch.project(indices).with_context(|_| {
142            ProjectArrowRecordBatchSnafu {
143                schema: self.schema.clone(),
144                projection: indices.to_vec(),
145            }
146        })?;
147
148        Ok(Self {
149            schema,
150            df_record_batch,
151        })
152    }
153
154    /// Create a new [`RecordBatch`] from `schema` and `df_record_batch`.
155    ///
156    /// This method doesn't check the schema.
157    pub fn from_df_record_batch(schema: SchemaRef, df_record_batch: DfRecordBatch) -> RecordBatch {
158        RecordBatch {
159            schema,
160            df_record_batch,
161        }
162    }
163
164    #[inline]
165    pub fn df_record_batch(&self) -> &DfRecordBatch {
166        &self.df_record_batch
167    }
168
169    #[inline]
170    pub fn into_df_record_batch(self) -> DfRecordBatch {
171        self.df_record_batch
172    }
173
174    #[inline]
175    pub fn columns(&self) -> &[ArrayRef] {
176        self.df_record_batch.columns()
177    }
178
179    #[inline]
180    pub fn column(&self, idx: usize) -> &ArrayRef {
181        self.df_record_batch.column(idx)
182    }
183
184    pub fn column_by_name(&self, name: &str) -> Option<&ArrayRef> {
185        self.df_record_batch.column_by_name(name)
186    }
187
188    #[inline]
189    pub fn num_columns(&self) -> usize {
190        self.df_record_batch.num_columns()
191    }
192
193    #[inline]
194    pub fn num_rows(&self) -> usize {
195        self.df_record_batch.num_rows()
196    }
197
198    pub fn column_vectors(
199        &self,
200        table_name: &str,
201        table_schema: SchemaRef,
202    ) -> Result<HashMap<String, VectorRef>> {
203        let mut vectors = HashMap::with_capacity(self.num_columns());
204
205        // column schemas in recordbatch must match its vectors, otherwise it's corrupted
206        for (field, array) in self
207            .df_record_batch
208            .schema()
209            .fields()
210            .iter()
211            .zip(self.df_record_batch.columns().iter())
212        {
213            let column_name = field.name();
214            let column_schema =
215                table_schema
216                    .column_schema_by_name(column_name)
217                    .context(ColumnNotExistsSnafu {
218                        table_name,
219                        column_name,
220                    })?;
221            let vector = if field.data_type() != &column_schema.data_type.as_arrow_type() {
222                let array = compute::cast(array, &column_schema.data_type.as_arrow_type())
223                    .context(ArrowComputeSnafu)?;
224                Helper::try_into_vector(array).context(DataTypesSnafu)?
225            } else {
226                Helper::try_into_vector(array).context(DataTypesSnafu)?
227            };
228
229            let _ = vectors.insert(column_name.clone(), vector);
230        }
231
232        Ok(vectors)
233    }
234
235    /// Pretty display this record batch like a table
236    pub fn pretty_print(&self) -> String {
237        pretty_format_batches(slice::from_ref(&self.df_record_batch))
238            .map(|t| t.to_string())
239            .unwrap_or("failed to pretty display a record batch".to_string())
240    }
241
242    /// Return a slice record batch starts from offset, with len rows
243    pub fn slice(&self, offset: usize, len: usize) -> Result<RecordBatch> {
244        ensure!(
245            offset + len <= self.num_rows(),
246            error::RecordBatchSliceIndexOverflowSnafu {
247                size: self.num_rows(),
248                visit_index: offset + len
249            }
250        );
251        let sliced = self.df_record_batch.slice(offset, len);
252        Ok(RecordBatch::from_df_record_batch(
253            self.schema.clone(),
254            sliced,
255        ))
256    }
257
258    /// Returns the total number of bytes of memory pointed to by the arrays in this `RecordBatch`.
259    ///
260    /// The buffers store bytes in the Arrow memory format, and include the data as well as the validity map.
261    /// Note that this does not always correspond to the exact memory usage of an array,
262    /// since multiple arrays can share the same buffers or slices thereof.
263    pub fn buffer_memory_size(&self) -> usize {
264        self.df_record_batch
265            .columns()
266            .iter()
267            .map(|array| array.get_buffer_memory_size())
268            .sum()
269    }
270
271    /// Iterate the values as strings in the column at index `i`.
272    ///
273    /// Note that if the underlying array is not a valid GreptimeDB vector, an empty iterator is
274    /// returned.
275    ///
276    /// # Panics
277    /// if index `i` is out of bound.
278    pub fn iter_column_as_string(&self, i: usize) -> Box<dyn Iterator<Item = Option<String>> + '_> {
279        macro_rules! iter {
280            ($column: ident) => {
281                Box::new(
282                    (0..$column.len())
283                        .map(|i| $column.is_valid(i).then(|| $column.value(i).to_string())),
284                )
285            };
286        }
287
288        let column = self.df_record_batch.column(i);
289        match column.data_type() {
290            ArrowDataType::Utf8 => {
291                let column = column.as_string::<i32>();
292                let iter = iter!(column);
293                iter as _
294            }
295            ArrowDataType::LargeUtf8 => {
296                let column = column.as_string::<i64>();
297                iter!(column)
298            }
299            ArrowDataType::Utf8View => {
300                let column = column.as_string_view();
301                iter!(column)
302            }
303            _ => {
304                if let Ok(column) = Helper::try_into_vector(column) {
305                    Box::new(
306                        (0..column.len())
307                            .map(move |i| (!column.is_null(i)).then(|| column.get(i).to_string())),
308                    )
309                } else {
310                    Box::new(std::iter::empty())
311                }
312            }
313        }
314    }
315}
316
317impl Serialize for RecordBatch {
318    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
319    where
320        S: Serializer,
321    {
322        // TODO(yingwen): arrow and arrow2's schemas have different fields, so
323        // it might be better to use our `RawSchema` as serialized field.
324        let mut s = serializer.serialize_struct("record", 2)?;
325        s.serialize_field("schema", &**self.schema.arrow_schema())?;
326
327        let columns = self.df_record_batch.columns();
328        let columns = Helper::try_into_vectors(columns).map_err(Error::custom)?;
329        let vec = columns
330            .iter()
331            .map(|c| c.serialize_to_json())
332            .collect::<std::result::Result<Vec<_>, _>>()
333            .map_err(S::Error::custom)?;
334
335        s.serialize_field("columns", &vec)?;
336        s.end()
337    }
338}
339
340/// merge multiple recordbatch into a single
341pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Result<RecordBatch> {
342    let batches_len = batches.len();
343    if batches_len == 0 {
344        return Ok(RecordBatch::new_empty(schema));
345    }
346
347    let record_batch = compute::concat_batches(
348        schema.arrow_schema(),
349        batches.iter().map(|x| x.df_record_batch()),
350    )
351    .context(ArrowComputeSnafu)?;
352
353    // Create a new RecordBatch with merged columns
354    Ok(RecordBatch::from_df_record_batch(schema, record_batch))
355}
356
357/// Align a json array `json_array` to the json type `schema_type`. The `schema_type` is often the
358/// "largest" json type after some insertions in the table schema, while the json array previously
359/// written in the SST could be lagged behind it. So it's important to "amend" the json array's
360/// missing fields with null arrays, to align the array's data type with the provided one.
361///
362/// # Panics
363///
364/// - The json array is not an Arrow [StructArray], or the provided data type `schema_type` is not
365///   of Struct type. Both of which shouldn't happen unless we switch our implementation of how
366///   json array is physically stored.
367pub fn align_json_array(json_array: &ArrayRef, schema_type: &ArrowDataType) -> Result<ArrayRef> {
368    let json_type = json_array.data_type();
369    if json_type == schema_type {
370        return Ok(json_array.clone());
371    }
372
373    let json_array = json_array.as_struct();
374    let array_fields = json_array.fields();
375    let array_columns = json_array.columns();
376    let ArrowDataType::Struct(schema_fields) = schema_type else {
377        unreachable!()
378    };
379    let mut aligned = Vec::with_capacity(schema_fields.len());
380
381    // Compare the fields in the json array and the to-be-aligned schema, amending with null arrays
382    // on the way. It's very important to note that fields in the json array and in the json type
383    // are both SORTED.
384
385    let mut i = 0; // point to the schema fields
386    let mut j = 0; // point to the array fields
387    while i < schema_fields.len() && j < array_fields.len() {
388        let schema_field = &schema_fields[i];
389        let array_field = &array_fields[j];
390        if schema_field.name() == array_field.name() {
391            if matches!(schema_field.data_type(), ArrowDataType::Struct(_)) {
392                // A `StructArray`s in a json array must be another json array. (Like a nested json
393                // object in a json value.)
394                aligned.push(align_json_array(
395                    &array_columns[j],
396                    schema_field.data_type(),
397                )?);
398            } else {
399                aligned.push(array_columns[j].clone());
400            }
401            j += 1;
402        } else {
403            aligned.push(new_null_array(schema_field.data_type(), json_array.len()));
404        }
405        i += 1;
406    }
407    if i < schema_fields.len() {
408        for field in &schema_fields[i..] {
409            aligned.push(new_null_array(field.data_type(), json_array.len()));
410        }
411    }
412    ensure!(
413        j == array_fields.len(),
414        AlignJsonArraySnafu {
415            reason: format!(
416                "this json array has more fields {:?}",
417                array_fields[j..]
418                    .iter()
419                    .map(|x| x.name())
420                    .collect::<Vec<_>>(),
421            )
422        }
423    );
424
425    let json_array =
426        StructArray::try_new(schema_fields.clone(), aligned, json_array.nulls().cloned())
427            .context(NewDfRecordBatchSnafu)?;
428    Ok(Arc::new(json_array))
429}
430
431fn maybe_align_json_array_with_schema(
432    schema: &ArrowSchemaRef,
433    arrays: Vec<ArrayRef>,
434) -> Result<Vec<ArrayRef>> {
435    if schema.fields().iter().all(|f| !is_json_extension_type(f)) {
436        return Ok(arrays);
437    }
438
439    let mut aligned = Vec::with_capacity(arrays.len());
440    for (field, array) in schema.fields().iter().zip(arrays.into_iter()) {
441        if !is_json_extension_type(field) {
442            aligned.push(array);
443            continue;
444        }
445
446        let json_array = align_json_array(&array, field.data_type())?;
447        aligned.push(json_array);
448    }
449    Ok(aligned)
450}
451
452#[cfg(test)]
453mod tests {
454    use std::sync::Arc;
455
456    use datatypes::arrow::array::{
457        AsArray, BooleanArray, Float64Array, Int64Array, ListArray, UInt32Array,
458    };
459    use datatypes::arrow::datatypes::{
460        DataType, Field, Fields, Int64Type, Schema as ArrowSchema, UInt32Type,
461    };
462    use datatypes::arrow_array::StringArray;
463    use datatypes::data_type::ConcreteDataType;
464    use datatypes::schema::{ColumnSchema, Schema};
465    use datatypes::vectors::{StringVector, UInt32Vector};
466
467    use super::*;
468
469    #[test]
470    fn test_align_json_array() -> Result<()> {
471        struct TestCase {
472            json_array: ArrayRef,
473            schema_type: DataType,
474            expected: std::result::Result<ArrayRef, String>,
475        }
476
477        impl TestCase {
478            fn new(
479                json_array: StructArray,
480                schema_type: Fields,
481                expected: std::result::Result<Vec<ArrayRef>, String>,
482            ) -> Self {
483                Self {
484                    json_array: Arc::new(json_array),
485                    schema_type: DataType::Struct(schema_type.clone()),
486                    expected: expected
487                        .map(|x| Arc::new(StructArray::new(schema_type, x, None)) as ArrayRef),
488                }
489            }
490
491            fn test(self) -> Result<()> {
492                let result = align_json_array(&self.json_array, &self.schema_type);
493                match (result, self.expected) {
494                    (Ok(json_array), Ok(expected)) => assert_eq!(&json_array, &expected),
495                    (Ok(json_array), Err(e)) => {
496                        panic!("expecting error {e} but actually get: {json_array:?}")
497                    }
498                    (Err(e), Err(expected)) => assert_eq!(e.to_string(), expected),
499                    (Err(e), Ok(_)) => return Err(e),
500                }
501                Ok(())
502            }
503        }
504
505        // Test empty json array can be aligned with a complex json type.
506        TestCase::new(
507            StructArray::new_empty_fields(2, None),
508            Fields::from(vec![
509                Field::new("int", DataType::Int64, true),
510                Field::new_struct(
511                    "nested",
512                    vec![Field::new("bool", DataType::Boolean, true)],
513                    true,
514                ),
515                Field::new("string", DataType::Utf8, true),
516            ]),
517            Ok(vec![
518                Arc::new(Int64Array::new_null(2)) as ArrayRef,
519                Arc::new(StructArray::new_null(
520                    Fields::from(vec![Arc::new(Field::new("bool", DataType::Boolean, true))]),
521                    2,
522                )),
523                Arc::new(StringArray::new_null(2)),
524            ]),
525        )
526        .test()?;
527
528        // Test simple json array alignment.
529        TestCase::new(
530            StructArray::from(vec![(
531                Arc::new(Field::new("float", DataType::Float64, true)),
532                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
533            )]),
534            Fields::from(vec![
535                Field::new("float", DataType::Float64, true),
536                Field::new("string", DataType::Utf8, true),
537            ]),
538            Ok(vec![
539                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
540                Arc::new(StringArray::new_null(3)),
541            ]),
542        )
543        .test()?;
544
545        // Test complex json array alignment.
546        TestCase::new(
547            StructArray::from(vec![
548                (
549                    Arc::new(Field::new_list(
550                        "list",
551                        Field::new_list_field(DataType::Int64, true),
552                        true,
553                    )),
554                    Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
555                        Some(vec![Some(1)]),
556                        None,
557                        Some(vec![Some(2), Some(3)]),
558                    ])) as ArrayRef,
559                ),
560                (
561                    Arc::new(Field::new_struct(
562                        "nested",
563                        vec![Field::new("int", DataType::Int64, true)],
564                        true,
565                    )),
566                    Arc::new(StructArray::from(vec![(
567                        Arc::new(Field::new("int", DataType::Int64, true)),
568                        Arc::new(Int64Array::from(vec![-1, -2, -3])) as ArrayRef,
569                    )])),
570                ),
571                (
572                    Arc::new(Field::new("string", DataType::Utf8, true)),
573                    Arc::new(StringArray::from(vec!["a", "b", "c"])),
574                ),
575            ]),
576            Fields::from(vec![
577                Field::new("bool", DataType::Boolean, true),
578                Field::new_list("list", Field::new_list_field(DataType::Int64, true), true),
579                Field::new_struct(
580                    "nested",
581                    vec![
582                        Field::new("float", DataType::Float64, true),
583                        Field::new("int", DataType::Int64, true),
584                    ],
585                    true,
586                ),
587                Field::new("string", DataType::Utf8, true),
588            ]),
589            Ok(vec![
590                Arc::new(BooleanArray::new_null(3)) as ArrayRef,
591                Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
592                    Some(vec![Some(1)]),
593                    None,
594                    Some(vec![Some(2), Some(3)]),
595                ])),
596                Arc::new(StructArray::from(vec![
597                    (
598                        Arc::new(Field::new("float", DataType::Float64, true)),
599                        Arc::new(Float64Array::new_null(3)) as ArrayRef,
600                    ),
601                    (
602                        Arc::new(Field::new("int", DataType::Int64, true)),
603                        Arc::new(Int64Array::from(vec![-1, -2, -3])),
604                    ),
605                ])),
606                Arc::new(StringArray::from(vec!["a", "b", "c"])),
607            ]),
608        )
609        .test()?;
610
611        // Test align failed.
612        TestCase::new(
613            StructArray::try_from(vec![
614                ("i", Arc::new(Int64Array::from(vec![1])) as ArrayRef),
615                ("j", Arc::new(Int64Array::from(vec![2])) as ArrayRef),
616            ])
617            .unwrap(),
618            Fields::from(vec![Field::new("i", DataType::Int64, true)]),
619            Err(
620                r#"Failed to align JSON array, reason: this json array has more fields ["j"]"#
621                    .to_string(),
622            ),
623        )
624        .test()?;
625        Ok(())
626    }
627
628    #[test]
629    fn test_record_batch() {
630        let arrow_schema = Arc::new(ArrowSchema::new(vec![
631            Field::new("c1", DataType::UInt32, false),
632            Field::new("c2", DataType::UInt32, false),
633        ]));
634        let schema = Arc::new(Schema::try_from(arrow_schema).unwrap());
635
636        let c1 = Arc::new(UInt32Vector::from_slice([1, 2, 3]));
637        let c2 = Arc::new(UInt32Vector::from_slice([4, 5, 6]));
638        let columns: Vec<VectorRef> = vec![c1, c2];
639
640        let expected = vec![
641            Arc::new(UInt32Array::from_iter_values([1, 2, 3])) as ArrayRef,
642            Arc::new(UInt32Array::from_iter_values([4, 5, 6])),
643        ];
644
645        let batch = RecordBatch::new(schema.clone(), columns.clone()).unwrap();
646        assert_eq!(3, batch.num_rows());
647        assert_eq!(expected, batch.df_record_batch().columns());
648        assert_eq!(schema, batch.schema);
649
650        assert_eq!(&expected[0], batch.column_by_name("c1").unwrap());
651        assert_eq!(&expected[1], batch.column_by_name("c2").unwrap());
652        assert!(batch.column_by_name("c3").is_none());
653
654        let converted = RecordBatch::from_df_record_batch(schema, batch.df_record_batch().clone());
655        assert_eq!(batch, converted);
656        assert_eq!(*batch.df_record_batch(), converted.into_df_record_batch());
657    }
658
659    #[test]
660    pub fn test_serialize_recordbatch() {
661        let column_schemas = vec![ColumnSchema::new(
662            "number",
663            ConcreteDataType::uint32_datatype(),
664            false,
665        )];
666        let schema = Arc::new(Schema::try_new(column_schemas).unwrap());
667
668        let numbers: Vec<u32> = (0..10).collect();
669        let columns = vec![Arc::new(UInt32Vector::from_slice(numbers)) as VectorRef];
670        let batch = RecordBatch::new(schema, columns).unwrap();
671
672        let output = serde_json::to_string(&batch).unwrap();
673        assert_eq!(
674            r#"{"schema":{"fields":[{"name":"number","data_type":"UInt32","nullable":false,"dict_id":0,"dict_is_ordered":false,"metadata":{}}],"metadata":{"greptime:version":"0"}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}"#,
675            output
676        );
677    }
678
679    #[test]
680    fn test_record_batch_slice() {
681        let column_schemas = vec![
682            ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
683            ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
684        ];
685        let schema = Arc::new(Schema::new(column_schemas));
686        let columns: Vec<VectorRef> = vec![
687            Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
688            Arc::new(StringVector::from(vec![
689                None,
690                Some("hello"),
691                Some("greptime"),
692                None,
693            ])),
694        ];
695        let recordbatch = RecordBatch::new(schema, columns).unwrap();
696        let recordbatch = recordbatch.slice(1, 2).expect("recordbatch slice");
697
698        let expected = &UInt32Array::from_iter_values([2u32, 3]);
699        let array = recordbatch.column(0);
700        let actual = array.as_primitive::<UInt32Type>();
701        assert_eq!(expected, actual);
702
703        let expected = &StringArray::from(vec!["hello", "greptime"]);
704        let array = recordbatch.column(1);
705        let actual = array.as_string::<i32>();
706        assert_eq!(expected, actual);
707
708        assert!(recordbatch.slice(1, 5).is_err());
709    }
710
711    #[test]
712    fn test_merge_record_batch() {
713        let column_schemas = vec![
714            ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
715            ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
716        ];
717        let schema = Arc::new(Schema::new(column_schemas));
718        let columns: Vec<VectorRef> = vec![
719            Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
720            Arc::new(StringVector::from(vec![
721                None,
722                Some("hello"),
723                Some("greptime"),
724                None,
725            ])),
726        ];
727        let recordbatch = RecordBatch::new(schema.clone(), columns).unwrap();
728
729        let columns: Vec<VectorRef> = vec![
730            Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
731            Arc::new(StringVector::from(vec![
732                None,
733                Some("hello"),
734                Some("greptime"),
735                None,
736            ])),
737        ];
738        let recordbatch2 = RecordBatch::new(schema.clone(), columns).unwrap();
739
740        let merged = merge_record_batches(schema.clone(), &[recordbatch, recordbatch2])
741            .expect("merge recordbatch");
742        assert_eq!(merged.num_rows(), 8);
743    }
744}