1use std::collections::HashMap;
16use std::slice;
17use std::sync::Arc;
18
19use datafusion::arrow::util::pretty::pretty_format_batches;
20use datafusion_common::arrow::array::ArrayRef;
21use datafusion_common::arrow::compute;
22use datafusion_common::arrow::datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef};
23use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions, StructArray, new_null_array};
24use datatypes::extension::json::is_json_extension_type;
25use datatypes::prelude::DataType;
26use datatypes::schema::SchemaRef;
27use datatypes::vectors::{Helper, VectorRef};
28use serde::ser::{Error, SerializeStruct};
29use serde::{Serialize, Serializer};
30use snafu::{OptionExt, ResultExt, ensure};
31
32use crate::DfRecordBatch;
33use crate::error::{
34 self, AlignJsonArraySnafu, ArrowComputeSnafu, ColumnNotExistsSnafu, DataTypesSnafu,
35 NewDfRecordBatchSnafu, ProjectArrowRecordBatchSnafu, Result,
36};
37
38#[derive(Clone, Debug, PartialEq)]
40pub struct RecordBatch {
41 pub schema: SchemaRef,
42 df_record_batch: DfRecordBatch,
43}
44
45impl RecordBatch {
46 pub fn new<I: IntoIterator<Item = VectorRef>>(
48 schema: SchemaRef,
49 columns: I,
50 ) -> Result<RecordBatch> {
51 let columns: Vec<_> = columns.into_iter().collect();
52 let arrow_arrays = columns.iter().map(|v| v.to_arrow_array()).collect();
53
54 let arrow_arrays = Self::cast_view_arrays(schema.arrow_schema(), arrow_arrays)?;
62
63 let arrow_arrays = maybe_align_json_array_with_schema(schema.arrow_schema(), arrow_arrays)?;
64
65 let df_record_batch = DfRecordBatch::try_new(schema.arrow_schema().clone(), arrow_arrays)
66 .context(error::NewDfRecordBatchSnafu)?;
67
68 Ok(RecordBatch {
69 schema,
70 df_record_batch,
71 })
72 }
73
74 fn cast_view_arrays(
75 schema: &ArrowSchemaRef,
76 mut arrays: Vec<ArrayRef>,
77 ) -> Result<Vec<ArrayRef>> {
78 for (f, a) in schema.fields().iter().zip(arrays.iter_mut()) {
79 let expected = f.data_type();
80 let actual = a.data_type();
81 if matches!(
82 (expected, actual),
83 (ArrowDataType::Utf8View, ArrowDataType::Utf8)
84 | (ArrowDataType::BinaryView, ArrowDataType::Binary)
85 ) {
86 *a = compute::cast(a, expected).context(ArrowComputeSnafu)?;
87 }
88 }
89 Ok(arrays)
90 }
91
92 pub fn new_empty(schema: SchemaRef) -> RecordBatch {
94 let df_record_batch = DfRecordBatch::new_empty(schema.arrow_schema().clone());
95 RecordBatch {
96 schema,
97 df_record_batch,
98 }
99 }
100
101 pub fn new_with_count(schema: SchemaRef, num_rows: usize) -> Result<Self> {
103 let df_record_batch = DfRecordBatch::try_new_with_options(
104 schema.arrow_schema().clone(),
105 vec![],
106 &RecordBatchOptions::new().with_row_count(Some(num_rows)),
107 )
108 .context(error::NewDfRecordBatchSnafu)?;
109 Ok(RecordBatch {
110 schema,
111 df_record_batch,
112 })
113 }
114
115 pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
116 let schema = Arc::new(self.schema.try_project(indices).context(DataTypesSnafu)?);
117 let df_record_batch = self.df_record_batch.project(indices).with_context(|_| {
118 ProjectArrowRecordBatchSnafu {
119 schema: self.schema.clone(),
120 projection: indices.to_vec(),
121 }
122 })?;
123
124 Ok(Self {
125 schema,
126 df_record_batch,
127 })
128 }
129
130 pub fn from_df_record_batch(schema: SchemaRef, df_record_batch: DfRecordBatch) -> RecordBatch {
134 RecordBatch {
135 schema,
136 df_record_batch,
137 }
138 }
139
140 #[inline]
141 pub fn df_record_batch(&self) -> &DfRecordBatch {
142 &self.df_record_batch
143 }
144
145 #[inline]
146 pub fn into_df_record_batch(self) -> DfRecordBatch {
147 self.df_record_batch
148 }
149
150 #[inline]
151 pub fn columns(&self) -> &[ArrayRef] {
152 self.df_record_batch.columns()
153 }
154
155 #[inline]
156 pub fn column(&self, idx: usize) -> &ArrayRef {
157 self.df_record_batch.column(idx)
158 }
159
160 pub fn column_by_name(&self, name: &str) -> Option<&ArrayRef> {
161 self.df_record_batch.column_by_name(name)
162 }
163
164 #[inline]
165 pub fn num_columns(&self) -> usize {
166 self.df_record_batch.num_columns()
167 }
168
169 #[inline]
170 pub fn num_rows(&self) -> usize {
171 self.df_record_batch.num_rows()
172 }
173
174 pub fn column_vectors(
175 &self,
176 table_name: &str,
177 table_schema: SchemaRef,
178 ) -> Result<HashMap<String, VectorRef>> {
179 let mut vectors = HashMap::with_capacity(self.num_columns());
180
181 for (field, array) in self
183 .df_record_batch
184 .schema()
185 .fields()
186 .iter()
187 .zip(self.df_record_batch.columns().iter())
188 {
189 let column_name = field.name();
190 let column_schema =
191 table_schema
192 .column_schema_by_name(column_name)
193 .context(ColumnNotExistsSnafu {
194 table_name,
195 column_name,
196 })?;
197 let vector = if field.data_type() != &column_schema.data_type.as_arrow_type() {
198 let array = compute::cast(array, &column_schema.data_type.as_arrow_type())
199 .context(ArrowComputeSnafu)?;
200 Helper::try_into_vector(array).context(DataTypesSnafu)?
201 } else {
202 Helper::try_into_vector(array).context(DataTypesSnafu)?
203 };
204
205 let _ = vectors.insert(column_name.clone(), vector);
206 }
207
208 Ok(vectors)
209 }
210
211 pub fn pretty_print(&self) -> String {
213 pretty_format_batches(slice::from_ref(&self.df_record_batch))
214 .map(|t| t.to_string())
215 .unwrap_or("failed to pretty display a record batch".to_string())
216 }
217
218 pub fn slice(&self, offset: usize, len: usize) -> Result<RecordBatch> {
220 ensure!(
221 offset + len <= self.num_rows(),
222 error::RecordBatchSliceIndexOverflowSnafu {
223 size: self.num_rows(),
224 visit_index: offset + len
225 }
226 );
227 let sliced = self.df_record_batch.slice(offset, len);
228 Ok(RecordBatch::from_df_record_batch(
229 self.schema.clone(),
230 sliced,
231 ))
232 }
233
234 pub fn buffer_memory_size(&self) -> usize {
240 self.df_record_batch
241 .columns()
242 .iter()
243 .map(|array| array.get_buffer_memory_size())
244 .sum()
245 }
246
247 pub fn iter_column_as_string(&self, i: usize) -> Box<dyn Iterator<Item = Option<String>> + '_> {
255 macro_rules! iter {
256 ($column: ident) => {
257 Box::new(
258 (0..$column.len())
259 .map(|i| $column.is_valid(i).then(|| $column.value(i).to_string())),
260 )
261 };
262 }
263
264 let column = self.df_record_batch.column(i);
265 match column.data_type() {
266 ArrowDataType::Utf8 => {
267 let column = column.as_string::<i32>();
268 let iter = iter!(column);
269 iter as _
270 }
271 ArrowDataType::LargeUtf8 => {
272 let column = column.as_string::<i64>();
273 iter!(column)
274 }
275 ArrowDataType::Utf8View => {
276 let column = column.as_string_view();
277 iter!(column)
278 }
279 _ => {
280 if let Ok(column) = Helper::try_into_vector(column) {
281 Box::new(
282 (0..column.len())
283 .map(move |i| (!column.is_null(i)).then(|| column.get(i).to_string())),
284 )
285 } else {
286 Box::new(std::iter::empty())
287 }
288 }
289 }
290 }
291}
292
293impl Serialize for RecordBatch {
294 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
295 where
296 S: Serializer,
297 {
298 let mut s = serializer.serialize_struct("record", 2)?;
301 s.serialize_field("schema", &**self.schema.arrow_schema())?;
302
303 let columns = self.df_record_batch.columns();
304 let columns = Helper::try_into_vectors(columns).map_err(Error::custom)?;
305 let vec = columns
306 .iter()
307 .map(|c| c.serialize_to_json())
308 .collect::<std::result::Result<Vec<_>, _>>()
309 .map_err(S::Error::custom)?;
310
311 s.serialize_field("columns", &vec)?;
312 s.end()
313 }
314}
315
316pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Result<RecordBatch> {
318 let batches_len = batches.len();
319 if batches_len == 0 {
320 return Ok(RecordBatch::new_empty(schema));
321 }
322
323 let record_batch = compute::concat_batches(
324 schema.arrow_schema(),
325 batches.iter().map(|x| x.df_record_batch()),
326 )
327 .context(ArrowComputeSnafu)?;
328
329 Ok(RecordBatch::from_df_record_batch(schema, record_batch))
331}
332
333pub fn align_json_array(json_array: &ArrayRef, schema_type: &ArrowDataType) -> Result<ArrayRef> {
344 let json_type = json_array.data_type();
345 if json_type == schema_type {
346 return Ok(json_array.clone());
347 }
348
349 let json_array = json_array.as_struct();
350 let array_fields = json_array.fields();
351 let array_columns = json_array.columns();
352 let ArrowDataType::Struct(schema_fields) = schema_type else {
353 unreachable!()
354 };
355 let mut aligned = Vec::with_capacity(schema_fields.len());
356
357 let mut i = 0; let mut j = 0; while i < schema_fields.len() && j < array_fields.len() {
364 let schema_field = &schema_fields[i];
365 let array_field = &array_fields[j];
366 if schema_field.name() == array_field.name() {
367 if matches!(schema_field.data_type(), ArrowDataType::Struct(_)) {
368 aligned.push(align_json_array(
371 &array_columns[j],
372 schema_field.data_type(),
373 )?);
374 } else {
375 aligned.push(array_columns[j].clone());
376 }
377 j += 1;
378 } else {
379 aligned.push(new_null_array(schema_field.data_type(), json_array.len()));
380 }
381 i += 1;
382 }
383 if i < schema_fields.len() {
384 for field in &schema_fields[i..] {
385 aligned.push(new_null_array(field.data_type(), json_array.len()));
386 }
387 }
388 ensure!(
389 j == array_fields.len(),
390 AlignJsonArraySnafu {
391 reason: format!(
392 "this json array has more fields {:?}",
393 array_fields[j..]
394 .iter()
395 .map(|x| x.name())
396 .collect::<Vec<_>>(),
397 )
398 }
399 );
400
401 let json_array =
402 StructArray::try_new(schema_fields.clone(), aligned, json_array.nulls().cloned())
403 .context(NewDfRecordBatchSnafu)?;
404 Ok(Arc::new(json_array))
405}
406
407fn maybe_align_json_array_with_schema(
408 schema: &ArrowSchemaRef,
409 arrays: Vec<ArrayRef>,
410) -> Result<Vec<ArrayRef>> {
411 if schema.fields().iter().all(|f| !is_json_extension_type(f)) {
412 return Ok(arrays);
413 }
414
415 let mut aligned = Vec::with_capacity(arrays.len());
416 for (field, array) in schema.fields().iter().zip(arrays.into_iter()) {
417 if !is_json_extension_type(field) {
418 aligned.push(array);
419 continue;
420 }
421
422 let json_array = align_json_array(&array, field.data_type())?;
423 aligned.push(json_array);
424 }
425 Ok(aligned)
426}
427
428#[cfg(test)]
429mod tests {
430 use std::sync::Arc;
431
432 use datatypes::arrow::array::{
433 AsArray, BooleanArray, Float64Array, Int64Array, ListArray, UInt32Array,
434 };
435 use datatypes::arrow::datatypes::{
436 DataType, Field, Fields, Int64Type, Schema as ArrowSchema, UInt32Type,
437 };
438 use datatypes::arrow_array::StringArray;
439 use datatypes::data_type::ConcreteDataType;
440 use datatypes::schema::{ColumnSchema, Schema};
441 use datatypes::vectors::{StringVector, UInt32Vector};
442
443 use super::*;
444
445 #[test]
446 fn test_align_json_array() -> Result<()> {
447 struct TestCase {
448 json_array: ArrayRef,
449 schema_type: DataType,
450 expected: std::result::Result<ArrayRef, String>,
451 }
452
453 impl TestCase {
454 fn new(
455 json_array: StructArray,
456 schema_type: Fields,
457 expected: std::result::Result<Vec<ArrayRef>, String>,
458 ) -> Self {
459 Self {
460 json_array: Arc::new(json_array),
461 schema_type: DataType::Struct(schema_type.clone()),
462 expected: expected
463 .map(|x| Arc::new(StructArray::new(schema_type, x, None)) as ArrayRef),
464 }
465 }
466
467 fn test(self) -> Result<()> {
468 let result = align_json_array(&self.json_array, &self.schema_type);
469 match (result, self.expected) {
470 (Ok(json_array), Ok(expected)) => assert_eq!(&json_array, &expected),
471 (Ok(json_array), Err(e)) => {
472 panic!("expecting error {e} but actually get: {json_array:?}")
473 }
474 (Err(e), Err(expected)) => assert_eq!(e.to_string(), expected),
475 (Err(e), Ok(_)) => return Err(e),
476 }
477 Ok(())
478 }
479 }
480
481 TestCase::new(
483 StructArray::new_empty_fields(2, None),
484 Fields::from(vec![
485 Field::new("int", DataType::Int64, true),
486 Field::new_struct(
487 "nested",
488 vec![Field::new("bool", DataType::Boolean, true)],
489 true,
490 ),
491 Field::new("string", DataType::Utf8, true),
492 ]),
493 Ok(vec![
494 Arc::new(Int64Array::new_null(2)) as ArrayRef,
495 Arc::new(StructArray::new_null(
496 Fields::from(vec![Arc::new(Field::new("bool", DataType::Boolean, true))]),
497 2,
498 )),
499 Arc::new(StringArray::new_null(2)),
500 ]),
501 )
502 .test()?;
503
504 TestCase::new(
506 StructArray::from(vec![(
507 Arc::new(Field::new("float", DataType::Float64, true)),
508 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
509 )]),
510 Fields::from(vec![
511 Field::new("float", DataType::Float64, true),
512 Field::new("string", DataType::Utf8, true),
513 ]),
514 Ok(vec![
515 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
516 Arc::new(StringArray::new_null(3)),
517 ]),
518 )
519 .test()?;
520
521 TestCase::new(
523 StructArray::from(vec![
524 (
525 Arc::new(Field::new_list(
526 "list",
527 Field::new_list_field(DataType::Int64, true),
528 true,
529 )),
530 Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
531 Some(vec![Some(1)]),
532 None,
533 Some(vec![Some(2), Some(3)]),
534 ])) as ArrayRef,
535 ),
536 (
537 Arc::new(Field::new_struct(
538 "nested",
539 vec![Field::new("int", DataType::Int64, true)],
540 true,
541 )),
542 Arc::new(StructArray::from(vec![(
543 Arc::new(Field::new("int", DataType::Int64, true)),
544 Arc::new(Int64Array::from(vec![-1, -2, -3])) as ArrayRef,
545 )])),
546 ),
547 (
548 Arc::new(Field::new("string", DataType::Utf8, true)),
549 Arc::new(StringArray::from(vec!["a", "b", "c"])),
550 ),
551 ]),
552 Fields::from(vec![
553 Field::new("bool", DataType::Boolean, true),
554 Field::new_list("list", Field::new_list_field(DataType::Int64, true), true),
555 Field::new_struct(
556 "nested",
557 vec![
558 Field::new("float", DataType::Float64, true),
559 Field::new("int", DataType::Int64, true),
560 ],
561 true,
562 ),
563 Field::new("string", DataType::Utf8, true),
564 ]),
565 Ok(vec![
566 Arc::new(BooleanArray::new_null(3)) as ArrayRef,
567 Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
568 Some(vec![Some(1)]),
569 None,
570 Some(vec![Some(2), Some(3)]),
571 ])),
572 Arc::new(StructArray::from(vec![
573 (
574 Arc::new(Field::new("float", DataType::Float64, true)),
575 Arc::new(Float64Array::new_null(3)) as ArrayRef,
576 ),
577 (
578 Arc::new(Field::new("int", DataType::Int64, true)),
579 Arc::new(Int64Array::from(vec![-1, -2, -3])),
580 ),
581 ])),
582 Arc::new(StringArray::from(vec!["a", "b", "c"])),
583 ]),
584 )
585 .test()?;
586
587 TestCase::new(
589 StructArray::try_from(vec![
590 ("i", Arc::new(Int64Array::from(vec![1])) as ArrayRef),
591 ("j", Arc::new(Int64Array::from(vec![2])) as ArrayRef),
592 ])
593 .unwrap(),
594 Fields::from(vec![Field::new("i", DataType::Int64, true)]),
595 Err(
596 r#"Failed to align JSON array, reason: this json array has more fields ["j"]"#
597 .to_string(),
598 ),
599 )
600 .test()?;
601 Ok(())
602 }
603
604 #[test]
605 fn test_record_batch() {
606 let arrow_schema = Arc::new(ArrowSchema::new(vec![
607 Field::new("c1", DataType::UInt32, false),
608 Field::new("c2", DataType::UInt32, false),
609 ]));
610 let schema = Arc::new(Schema::try_from(arrow_schema).unwrap());
611
612 let c1 = Arc::new(UInt32Vector::from_slice([1, 2, 3]));
613 let c2 = Arc::new(UInt32Vector::from_slice([4, 5, 6]));
614 let columns: Vec<VectorRef> = vec![c1, c2];
615
616 let expected = vec![
617 Arc::new(UInt32Array::from_iter_values([1, 2, 3])) as ArrayRef,
618 Arc::new(UInt32Array::from_iter_values([4, 5, 6])),
619 ];
620
621 let batch = RecordBatch::new(schema.clone(), columns.clone()).unwrap();
622 assert_eq!(3, batch.num_rows());
623 assert_eq!(expected, batch.df_record_batch().columns());
624 assert_eq!(schema, batch.schema);
625
626 assert_eq!(&expected[0], batch.column_by_name("c1").unwrap());
627 assert_eq!(&expected[1], batch.column_by_name("c2").unwrap());
628 assert!(batch.column_by_name("c3").is_none());
629
630 let converted = RecordBatch::from_df_record_batch(schema, batch.df_record_batch().clone());
631 assert_eq!(batch, converted);
632 assert_eq!(*batch.df_record_batch(), converted.into_df_record_batch());
633 }
634
635 #[test]
636 pub fn test_serialize_recordbatch() {
637 let column_schemas = vec![ColumnSchema::new(
638 "number",
639 ConcreteDataType::uint32_datatype(),
640 false,
641 )];
642 let schema = Arc::new(Schema::try_new(column_schemas).unwrap());
643
644 let numbers: Vec<u32> = (0..10).collect();
645 let columns = vec![Arc::new(UInt32Vector::from_slice(numbers)) as VectorRef];
646 let batch = RecordBatch::new(schema, columns).unwrap();
647
648 let output = serde_json::to_string(&batch).unwrap();
649 assert_eq!(
650 r#"{"schema":{"fields":[{"name":"number","data_type":"UInt32","nullable":false,"dict_id":0,"dict_is_ordered":false,"metadata":{}}],"metadata":{"greptime:version":"0"}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}"#,
651 output
652 );
653 }
654
655 #[test]
656 fn test_record_batch_slice() {
657 let column_schemas = vec![
658 ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
659 ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
660 ];
661 let schema = Arc::new(Schema::new(column_schemas));
662 let columns: Vec<VectorRef> = vec![
663 Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
664 Arc::new(StringVector::from(vec![
665 None,
666 Some("hello"),
667 Some("greptime"),
668 None,
669 ])),
670 ];
671 let recordbatch = RecordBatch::new(schema, columns).unwrap();
672 let recordbatch = recordbatch.slice(1, 2).expect("recordbatch slice");
673
674 let expected = &UInt32Array::from_iter_values([2u32, 3]);
675 let array = recordbatch.column(0);
676 let actual = array.as_primitive::<UInt32Type>();
677 assert_eq!(expected, actual);
678
679 let expected = &StringArray::from(vec!["hello", "greptime"]);
680 let array = recordbatch.column(1);
681 let actual = array.as_string::<i32>();
682 assert_eq!(expected, actual);
683
684 assert!(recordbatch.slice(1, 5).is_err());
685 }
686
687 #[test]
688 fn test_merge_record_batch() {
689 let column_schemas = vec![
690 ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
691 ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
692 ];
693 let schema = Arc::new(Schema::new(column_schemas));
694 let columns: Vec<VectorRef> = vec![
695 Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
696 Arc::new(StringVector::from(vec![
697 None,
698 Some("hello"),
699 Some("greptime"),
700 None,
701 ])),
702 ];
703 let recordbatch = RecordBatch::new(schema.clone(), columns).unwrap();
704
705 let columns: Vec<VectorRef> = vec![
706 Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
707 Arc::new(StringVector::from(vec![
708 None,
709 Some("hello"),
710 Some("greptime"),
711 None,
712 ])),
713 ];
714 let recordbatch2 = RecordBatch::new(schema.clone(), columns).unwrap();
715
716 let merged = merge_record_batches(schema.clone(), &[recordbatch, recordbatch2])
717 .expect("merge recordbatch");
718 assert_eq!(merged.num_rows(), 8);
719 }
720}