1use std::collections::HashMap;
16use std::slice;
17use std::sync::Arc;
18
19use datafusion::arrow::util::pretty::pretty_format_batches;
20use datafusion_common::arrow::array::ArrayRef;
21use datafusion_common::arrow::compute;
22use datafusion_common::arrow::datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef};
23use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions, StructArray, new_null_array};
24use datatypes::extension::json::is_json_extension_type;
25use datatypes::prelude::DataType;
26use datatypes::schema::SchemaRef;
27use datatypes::vectors::{Helper, VectorRef};
28use serde::ser::{Error, SerializeStruct};
29use serde::{Serialize, Serializer};
30use snafu::{OptionExt, ResultExt, ensure};
31
32use crate::DfRecordBatch;
33use crate::error::{
34 self, AlignJsonArraySnafu, ArrowComputeSnafu, ColumnNotExistsSnafu, DataTypesSnafu,
35 NewDfRecordBatchSnafu, ProjectArrowRecordBatchSnafu, Result,
36};
37
38#[derive(Clone, Debug, PartialEq)]
40pub struct RecordBatch {
41 pub schema: SchemaRef,
42 df_record_batch: DfRecordBatch,
43}
44
45impl RecordBatch {
46 pub fn new<I: IntoIterator<Item = VectorRef>>(
48 schema: SchemaRef,
49 columns: I,
50 ) -> Result<RecordBatch> {
51 let columns: Vec<_> = columns.into_iter().collect();
52 let arrow_arrays = columns.iter().map(|v| v.to_arrow_array()).collect();
53
54 let arrow_arrays = Self::cast_view_arrays(schema.arrow_schema(), arrow_arrays)?;
62
63 let arrow_arrays = maybe_align_json_array_with_schema(schema.arrow_schema(), arrow_arrays)?;
64
65 let df_record_batch = DfRecordBatch::try_new(schema.arrow_schema().clone(), arrow_arrays)
66 .context(error::NewDfRecordBatchSnafu)?;
67
68 Ok(RecordBatch {
69 schema,
70 df_record_batch,
71 })
72 }
73
74 pub fn to_df_record_batch<I: IntoIterator<Item = VectorRef>>(
75 arrow_schema: ArrowSchemaRef,
76 columns: I,
77 ) -> Result<DfRecordBatch> {
78 let columns: Vec<_> = columns.into_iter().collect();
79 let arrow_arrays = columns.iter().map(|v| v.to_arrow_array()).collect();
80
81 let arrow_arrays = Self::cast_view_arrays(&arrow_schema, arrow_arrays)?;
89
90 let arrow_arrays = maybe_align_json_array_with_schema(&arrow_schema, arrow_arrays)?;
91
92 let df_record_batch = DfRecordBatch::try_new(arrow_schema, arrow_arrays)
93 .context(error::NewDfRecordBatchSnafu)?;
94
95 Ok(df_record_batch)
96 }
97
98 fn cast_view_arrays(
99 schema: &ArrowSchemaRef,
100 mut arrays: Vec<ArrayRef>,
101 ) -> Result<Vec<ArrayRef>> {
102 for (f, a) in schema.fields().iter().zip(arrays.iter_mut()) {
103 let expected = f.data_type();
104 let actual = a.data_type();
105 if matches!(
106 (expected, actual),
107 (ArrowDataType::Utf8View, ArrowDataType::Utf8)
108 | (ArrowDataType::BinaryView, ArrowDataType::Binary)
109 ) {
110 *a = compute::cast(a, expected).context(ArrowComputeSnafu)?;
111 }
112 }
113 Ok(arrays)
114 }
115
116 pub fn new_empty(schema: SchemaRef) -> RecordBatch {
118 let df_record_batch = DfRecordBatch::new_empty(schema.arrow_schema().clone());
119 RecordBatch {
120 schema,
121 df_record_batch,
122 }
123 }
124
125 pub fn new_with_count(schema: SchemaRef, num_rows: usize) -> Result<Self> {
127 let df_record_batch = DfRecordBatch::try_new_with_options(
128 schema.arrow_schema().clone(),
129 vec![],
130 &RecordBatchOptions::new().with_row_count(Some(num_rows)),
131 )
132 .context(error::NewDfRecordBatchSnafu)?;
133 Ok(RecordBatch {
134 schema,
135 df_record_batch,
136 })
137 }
138
139 pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
140 let schema = Arc::new(self.schema.try_project(indices).context(DataTypesSnafu)?);
141 let df_record_batch = self.df_record_batch.project(indices).with_context(|_| {
142 ProjectArrowRecordBatchSnafu {
143 schema: self.schema.clone(),
144 projection: indices.to_vec(),
145 }
146 })?;
147
148 Ok(Self {
149 schema,
150 df_record_batch,
151 })
152 }
153
154 pub fn from_df_record_batch(schema: SchemaRef, df_record_batch: DfRecordBatch) -> RecordBatch {
158 RecordBatch {
159 schema,
160 df_record_batch,
161 }
162 }
163
164 #[inline]
165 pub fn df_record_batch(&self) -> &DfRecordBatch {
166 &self.df_record_batch
167 }
168
169 #[inline]
170 pub fn into_df_record_batch(self) -> DfRecordBatch {
171 self.df_record_batch
172 }
173
174 #[inline]
175 pub fn columns(&self) -> &[ArrayRef] {
176 self.df_record_batch.columns()
177 }
178
179 #[inline]
180 pub fn column(&self, idx: usize) -> &ArrayRef {
181 self.df_record_batch.column(idx)
182 }
183
184 pub fn column_by_name(&self, name: &str) -> Option<&ArrayRef> {
185 self.df_record_batch.column_by_name(name)
186 }
187
188 #[inline]
189 pub fn num_columns(&self) -> usize {
190 self.df_record_batch.num_columns()
191 }
192
193 #[inline]
194 pub fn num_rows(&self) -> usize {
195 self.df_record_batch.num_rows()
196 }
197
198 pub fn column_vectors(
199 &self,
200 table_name: &str,
201 table_schema: SchemaRef,
202 ) -> Result<HashMap<String, VectorRef>> {
203 let mut vectors = HashMap::with_capacity(self.num_columns());
204
205 for (field, array) in self
207 .df_record_batch
208 .schema()
209 .fields()
210 .iter()
211 .zip(self.df_record_batch.columns().iter())
212 {
213 let column_name = field.name();
214 let column_schema =
215 table_schema
216 .column_schema_by_name(column_name)
217 .context(ColumnNotExistsSnafu {
218 table_name,
219 column_name,
220 })?;
221 let vector = if field.data_type() != &column_schema.data_type.as_arrow_type() {
222 let array = compute::cast(array, &column_schema.data_type.as_arrow_type())
223 .context(ArrowComputeSnafu)?;
224 Helper::try_into_vector(array).context(DataTypesSnafu)?
225 } else {
226 Helper::try_into_vector(array).context(DataTypesSnafu)?
227 };
228
229 let _ = vectors.insert(column_name.clone(), vector);
230 }
231
232 Ok(vectors)
233 }
234
235 pub fn pretty_print(&self) -> String {
237 pretty_format_batches(slice::from_ref(&self.df_record_batch))
238 .map(|t| t.to_string())
239 .unwrap_or("failed to pretty display a record batch".to_string())
240 }
241
242 pub fn slice(&self, offset: usize, len: usize) -> Result<RecordBatch> {
244 ensure!(
245 offset + len <= self.num_rows(),
246 error::RecordBatchSliceIndexOverflowSnafu {
247 size: self.num_rows(),
248 visit_index: offset + len
249 }
250 );
251 let sliced = self.df_record_batch.slice(offset, len);
252 Ok(RecordBatch::from_df_record_batch(
253 self.schema.clone(),
254 sliced,
255 ))
256 }
257
258 pub fn buffer_memory_size(&self) -> usize {
264 self.df_record_batch
265 .columns()
266 .iter()
267 .map(|array| array.get_buffer_memory_size())
268 .sum()
269 }
270
271 pub fn iter_column_as_string(&self, i: usize) -> Box<dyn Iterator<Item = Option<String>> + '_> {
279 macro_rules! iter {
280 ($column: ident) => {
281 Box::new(
282 (0..$column.len())
283 .map(|i| $column.is_valid(i).then(|| $column.value(i).to_string())),
284 )
285 };
286 }
287
288 let column = self.df_record_batch.column(i);
289 match column.data_type() {
290 ArrowDataType::Utf8 => {
291 let column = column.as_string::<i32>();
292 let iter = iter!(column);
293 iter as _
294 }
295 ArrowDataType::LargeUtf8 => {
296 let column = column.as_string::<i64>();
297 iter!(column)
298 }
299 ArrowDataType::Utf8View => {
300 let column = column.as_string_view();
301 iter!(column)
302 }
303 _ => {
304 if let Ok(column) = Helper::try_into_vector(column) {
305 Box::new(
306 (0..column.len())
307 .map(move |i| (!column.is_null(i)).then(|| column.get(i).to_string())),
308 )
309 } else {
310 Box::new(std::iter::empty())
311 }
312 }
313 }
314 }
315}
316
317impl Serialize for RecordBatch {
318 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
319 where
320 S: Serializer,
321 {
322 let mut s = serializer.serialize_struct("record", 2)?;
325 s.serialize_field("schema", &**self.schema.arrow_schema())?;
326
327 let columns = self.df_record_batch.columns();
328 let columns = Helper::try_into_vectors(columns).map_err(Error::custom)?;
329 let vec = columns
330 .iter()
331 .map(|c| c.serialize_to_json())
332 .collect::<std::result::Result<Vec<_>, _>>()
333 .map_err(S::Error::custom)?;
334
335 s.serialize_field("columns", &vec)?;
336 s.end()
337 }
338}
339
340pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Result<RecordBatch> {
342 let batches_len = batches.len();
343 if batches_len == 0 {
344 return Ok(RecordBatch::new_empty(schema));
345 }
346
347 let record_batch = compute::concat_batches(
348 schema.arrow_schema(),
349 batches.iter().map(|x| x.df_record_batch()),
350 )
351 .context(ArrowComputeSnafu)?;
352
353 Ok(RecordBatch::from_df_record_batch(schema, record_batch))
355}
356
357pub fn align_json_array(json_array: &ArrayRef, schema_type: &ArrowDataType) -> Result<ArrayRef> {
368 let json_type = json_array.data_type();
369 if json_type == schema_type {
370 return Ok(json_array.clone());
371 }
372
373 let json_array = json_array.as_struct();
374 let array_fields = json_array.fields();
375 let array_columns = json_array.columns();
376 let ArrowDataType::Struct(schema_fields) = schema_type else {
377 unreachable!()
378 };
379 let mut aligned = Vec::with_capacity(schema_fields.len());
380
381 let mut i = 0; let mut j = 0; while i < schema_fields.len() && j < array_fields.len() {
388 let schema_field = &schema_fields[i];
389 let array_field = &array_fields[j];
390 if schema_field.name() == array_field.name() {
391 if matches!(schema_field.data_type(), ArrowDataType::Struct(_)) {
392 aligned.push(align_json_array(
395 &array_columns[j],
396 schema_field.data_type(),
397 )?);
398 } else {
399 aligned.push(array_columns[j].clone());
400 }
401 j += 1;
402 } else {
403 aligned.push(new_null_array(schema_field.data_type(), json_array.len()));
404 }
405 i += 1;
406 }
407 if i < schema_fields.len() {
408 for field in &schema_fields[i..] {
409 aligned.push(new_null_array(field.data_type(), json_array.len()));
410 }
411 }
412 ensure!(
413 j == array_fields.len(),
414 AlignJsonArraySnafu {
415 reason: format!(
416 "this json array has more fields {:?}",
417 array_fields[j..]
418 .iter()
419 .map(|x| x.name())
420 .collect::<Vec<_>>(),
421 )
422 }
423 );
424
425 let json_array =
426 StructArray::try_new(schema_fields.clone(), aligned, json_array.nulls().cloned())
427 .context(NewDfRecordBatchSnafu)?;
428 Ok(Arc::new(json_array))
429}
430
431fn maybe_align_json_array_with_schema(
432 schema: &ArrowSchemaRef,
433 arrays: Vec<ArrayRef>,
434) -> Result<Vec<ArrayRef>> {
435 if schema.fields().iter().all(|f| !is_json_extension_type(f)) {
436 return Ok(arrays);
437 }
438
439 let mut aligned = Vec::with_capacity(arrays.len());
440 for (field, array) in schema.fields().iter().zip(arrays.into_iter()) {
441 if !is_json_extension_type(field) {
442 aligned.push(array);
443 continue;
444 }
445
446 let json_array = align_json_array(&array, field.data_type())?;
447 aligned.push(json_array);
448 }
449 Ok(aligned)
450}
451
452#[cfg(test)]
453mod tests {
454 use std::sync::Arc;
455
456 use datatypes::arrow::array::{
457 AsArray, BooleanArray, Float64Array, Int64Array, ListArray, UInt32Array,
458 };
459 use datatypes::arrow::datatypes::{
460 DataType, Field, Fields, Int64Type, Schema as ArrowSchema, UInt32Type,
461 };
462 use datatypes::arrow_array::StringArray;
463 use datatypes::data_type::ConcreteDataType;
464 use datatypes::schema::{ColumnSchema, Schema};
465 use datatypes::vectors::{StringVector, UInt32Vector};
466
467 use super::*;
468
469 #[test]
470 fn test_align_json_array() -> Result<()> {
471 struct TestCase {
472 json_array: ArrayRef,
473 schema_type: DataType,
474 expected: std::result::Result<ArrayRef, String>,
475 }
476
477 impl TestCase {
478 fn new(
479 json_array: StructArray,
480 schema_type: Fields,
481 expected: std::result::Result<Vec<ArrayRef>, String>,
482 ) -> Self {
483 Self {
484 json_array: Arc::new(json_array),
485 schema_type: DataType::Struct(schema_type.clone()),
486 expected: expected
487 .map(|x| Arc::new(StructArray::new(schema_type, x, None)) as ArrayRef),
488 }
489 }
490
491 fn test(self) -> Result<()> {
492 let result = align_json_array(&self.json_array, &self.schema_type);
493 match (result, self.expected) {
494 (Ok(json_array), Ok(expected)) => assert_eq!(&json_array, &expected),
495 (Ok(json_array), Err(e)) => {
496 panic!("expecting error {e} but actually get: {json_array:?}")
497 }
498 (Err(e), Err(expected)) => assert_eq!(e.to_string(), expected),
499 (Err(e), Ok(_)) => return Err(e),
500 }
501 Ok(())
502 }
503 }
504
505 TestCase::new(
507 StructArray::new_empty_fields(2, None),
508 Fields::from(vec![
509 Field::new("int", DataType::Int64, true),
510 Field::new_struct(
511 "nested",
512 vec![Field::new("bool", DataType::Boolean, true)],
513 true,
514 ),
515 Field::new("string", DataType::Utf8, true),
516 ]),
517 Ok(vec![
518 Arc::new(Int64Array::new_null(2)) as ArrayRef,
519 Arc::new(StructArray::new_null(
520 Fields::from(vec![Arc::new(Field::new("bool", DataType::Boolean, true))]),
521 2,
522 )),
523 Arc::new(StringArray::new_null(2)),
524 ]),
525 )
526 .test()?;
527
528 TestCase::new(
530 StructArray::from(vec![(
531 Arc::new(Field::new("float", DataType::Float64, true)),
532 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
533 )]),
534 Fields::from(vec![
535 Field::new("float", DataType::Float64, true),
536 Field::new("string", DataType::Utf8, true),
537 ]),
538 Ok(vec![
539 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
540 Arc::new(StringArray::new_null(3)),
541 ]),
542 )
543 .test()?;
544
545 TestCase::new(
547 StructArray::from(vec![
548 (
549 Arc::new(Field::new_list(
550 "list",
551 Field::new_list_field(DataType::Int64, true),
552 true,
553 )),
554 Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
555 Some(vec![Some(1)]),
556 None,
557 Some(vec![Some(2), Some(3)]),
558 ])) as ArrayRef,
559 ),
560 (
561 Arc::new(Field::new_struct(
562 "nested",
563 vec![Field::new("int", DataType::Int64, true)],
564 true,
565 )),
566 Arc::new(StructArray::from(vec![(
567 Arc::new(Field::new("int", DataType::Int64, true)),
568 Arc::new(Int64Array::from(vec![-1, -2, -3])) as ArrayRef,
569 )])),
570 ),
571 (
572 Arc::new(Field::new("string", DataType::Utf8, true)),
573 Arc::new(StringArray::from(vec!["a", "b", "c"])),
574 ),
575 ]),
576 Fields::from(vec![
577 Field::new("bool", DataType::Boolean, true),
578 Field::new_list("list", Field::new_list_field(DataType::Int64, true), true),
579 Field::new_struct(
580 "nested",
581 vec![
582 Field::new("float", DataType::Float64, true),
583 Field::new("int", DataType::Int64, true),
584 ],
585 true,
586 ),
587 Field::new("string", DataType::Utf8, true),
588 ]),
589 Ok(vec![
590 Arc::new(BooleanArray::new_null(3)) as ArrayRef,
591 Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
592 Some(vec![Some(1)]),
593 None,
594 Some(vec![Some(2), Some(3)]),
595 ])),
596 Arc::new(StructArray::from(vec![
597 (
598 Arc::new(Field::new("float", DataType::Float64, true)),
599 Arc::new(Float64Array::new_null(3)) as ArrayRef,
600 ),
601 (
602 Arc::new(Field::new("int", DataType::Int64, true)),
603 Arc::new(Int64Array::from(vec![-1, -2, -3])),
604 ),
605 ])),
606 Arc::new(StringArray::from(vec!["a", "b", "c"])),
607 ]),
608 )
609 .test()?;
610
611 TestCase::new(
613 StructArray::try_from(vec![
614 ("i", Arc::new(Int64Array::from(vec![1])) as ArrayRef),
615 ("j", Arc::new(Int64Array::from(vec![2])) as ArrayRef),
616 ])
617 .unwrap(),
618 Fields::from(vec![Field::new("i", DataType::Int64, true)]),
619 Err(
620 r#"Failed to align JSON array, reason: this json array has more fields ["j"]"#
621 .to_string(),
622 ),
623 )
624 .test()?;
625 Ok(())
626 }
627
628 #[test]
629 fn test_record_batch() {
630 let arrow_schema = Arc::new(ArrowSchema::new(vec![
631 Field::new("c1", DataType::UInt32, false),
632 Field::new("c2", DataType::UInt32, false),
633 ]));
634 let schema = Arc::new(Schema::try_from(arrow_schema).unwrap());
635
636 let c1 = Arc::new(UInt32Vector::from_slice([1, 2, 3]));
637 let c2 = Arc::new(UInt32Vector::from_slice([4, 5, 6]));
638 let columns: Vec<VectorRef> = vec![c1, c2];
639
640 let expected = vec![
641 Arc::new(UInt32Array::from_iter_values([1, 2, 3])) as ArrayRef,
642 Arc::new(UInt32Array::from_iter_values([4, 5, 6])),
643 ];
644
645 let batch = RecordBatch::new(schema.clone(), columns.clone()).unwrap();
646 assert_eq!(3, batch.num_rows());
647 assert_eq!(expected, batch.df_record_batch().columns());
648 assert_eq!(schema, batch.schema);
649
650 assert_eq!(&expected[0], batch.column_by_name("c1").unwrap());
651 assert_eq!(&expected[1], batch.column_by_name("c2").unwrap());
652 assert!(batch.column_by_name("c3").is_none());
653
654 let converted = RecordBatch::from_df_record_batch(schema, batch.df_record_batch().clone());
655 assert_eq!(batch, converted);
656 assert_eq!(*batch.df_record_batch(), converted.into_df_record_batch());
657 }
658
659 #[test]
660 pub fn test_serialize_recordbatch() {
661 let column_schemas = vec![ColumnSchema::new(
662 "number",
663 ConcreteDataType::uint32_datatype(),
664 false,
665 )];
666 let schema = Arc::new(Schema::try_new(column_schemas).unwrap());
667
668 let numbers: Vec<u32> = (0..10).collect();
669 let columns = vec![Arc::new(UInt32Vector::from_slice(numbers)) as VectorRef];
670 let batch = RecordBatch::new(schema, columns).unwrap();
671
672 let output = serde_json::to_string(&batch).unwrap();
673 assert_eq!(
674 r#"{"schema":{"fields":[{"name":"number","data_type":"UInt32","nullable":false,"dict_id":0,"dict_is_ordered":false,"metadata":{}}],"metadata":{"greptime:version":"0"}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}"#,
675 output
676 );
677 }
678
679 #[test]
680 fn test_record_batch_slice() {
681 let column_schemas = vec![
682 ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
683 ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
684 ];
685 let schema = Arc::new(Schema::new(column_schemas));
686 let columns: Vec<VectorRef> = vec![
687 Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
688 Arc::new(StringVector::from(vec![
689 None,
690 Some("hello"),
691 Some("greptime"),
692 None,
693 ])),
694 ];
695 let recordbatch = RecordBatch::new(schema, columns).unwrap();
696 let recordbatch = recordbatch.slice(1, 2).expect("recordbatch slice");
697
698 let expected = &UInt32Array::from_iter_values([2u32, 3]);
699 let array = recordbatch.column(0);
700 let actual = array.as_primitive::<UInt32Type>();
701 assert_eq!(expected, actual);
702
703 let expected = &StringArray::from(vec!["hello", "greptime"]);
704 let array = recordbatch.column(1);
705 let actual = array.as_string::<i32>();
706 assert_eq!(expected, actual);
707
708 assert!(recordbatch.slice(1, 5).is_err());
709 }
710
711 #[test]
712 fn test_merge_record_batch() {
713 let column_schemas = vec![
714 ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
715 ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
716 ];
717 let schema = Arc::new(Schema::new(column_schemas));
718 let columns: Vec<VectorRef> = vec![
719 Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
720 Arc::new(StringVector::from(vec![
721 None,
722 Some("hello"),
723 Some("greptime"),
724 None,
725 ])),
726 ];
727 let recordbatch = RecordBatch::new(schema.clone(), columns).unwrap();
728
729 let columns: Vec<VectorRef> = vec![
730 Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
731 Arc::new(StringVector::from(vec![
732 None,
733 Some("hello"),
734 Some("greptime"),
735 None,
736 ])),
737 ];
738 let recordbatch2 = RecordBatch::new(schema.clone(), columns).unwrap();
739
740 let merged = merge_record_batches(schema.clone(), &[recordbatch, recordbatch2])
741 .expect("merge recordbatch");
742 assert_eq!(merged.num_rows(), 8);
743 }
744}