1use std::collections::HashMap;
16use std::slice;
17use std::sync::Arc;
18
19use datafusion::arrow::util::pretty::pretty_format_batches;
20use datafusion_common::arrow::array::ArrayRef;
21use datafusion_common::arrow::compute;
22use datafusion_common::arrow::datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef};
23use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions};
24use datatypes::extension::json::is_json_extension_type;
25use datatypes::prelude::DataType;
26use datatypes::schema::SchemaRef;
27use datatypes::vectors::json::array::JsonArray;
28use datatypes::vectors::{Helper, VectorRef};
29use serde::ser::{Error, SerializeStruct};
30use serde::{Serialize, Serializer};
31use snafu::{OptionExt, ResultExt, ensure};
32
33use crate::DfRecordBatch;
34use crate::error::{
35 self, ArrowComputeSnafu, ColumnNotExistsSnafu, DataTypesSnafu, ProjectArrowRecordBatchSnafu,
36 Result,
37};
38
39#[derive(Clone, Debug, PartialEq)]
41pub struct RecordBatch {
42 pub schema: SchemaRef,
43 df_record_batch: DfRecordBatch,
44}
45
46impl RecordBatch {
47 pub fn new<I: IntoIterator<Item = VectorRef>>(
49 schema: SchemaRef,
50 columns: I,
51 ) -> Result<RecordBatch> {
52 let columns: Vec<_> = columns.into_iter().collect();
53 let arrow_arrays = columns.iter().map(|v| v.to_arrow_array()).collect();
54
55 let arrow_arrays = Self::cast_view_arrays(schema.arrow_schema(), arrow_arrays)?;
63
64 let arrow_arrays = maybe_align_json_array_with_schema(schema.arrow_schema(), arrow_arrays)?;
65
66 let df_record_batch = DfRecordBatch::try_new(schema.arrow_schema().clone(), arrow_arrays)
67 .context(error::NewDfRecordBatchSnafu)?;
68
69 Ok(RecordBatch {
70 schema,
71 df_record_batch,
72 })
73 }
74
75 pub fn to_df_record_batch<I: IntoIterator<Item = VectorRef>>(
76 arrow_schema: ArrowSchemaRef,
77 columns: I,
78 ) -> Result<DfRecordBatch> {
79 let columns: Vec<_> = columns.into_iter().collect();
80 let arrow_arrays = columns.iter().map(|v| v.to_arrow_array()).collect();
81
82 let arrow_arrays = Self::cast_view_arrays(&arrow_schema, arrow_arrays)?;
90
91 let arrow_arrays = maybe_align_json_array_with_schema(&arrow_schema, arrow_arrays)?;
92
93 let df_record_batch = DfRecordBatch::try_new(arrow_schema, arrow_arrays)
94 .context(error::NewDfRecordBatchSnafu)?;
95
96 Ok(df_record_batch)
97 }
98
99 fn cast_view_arrays(
100 schema: &ArrowSchemaRef,
101 mut arrays: Vec<ArrayRef>,
102 ) -> Result<Vec<ArrayRef>> {
103 for (f, a) in schema.fields().iter().zip(arrays.iter_mut()) {
104 let expected = f.data_type();
105 let actual = a.data_type();
106 if matches!(
107 (expected, actual),
108 (ArrowDataType::Utf8View, ArrowDataType::Utf8)
109 | (ArrowDataType::BinaryView, ArrowDataType::Binary)
110 ) {
111 *a = compute::cast(a, expected).context(ArrowComputeSnafu)?;
112 }
113 }
114 Ok(arrays)
115 }
116
117 pub fn new_empty(schema: SchemaRef) -> RecordBatch {
119 let df_record_batch = DfRecordBatch::new_empty(schema.arrow_schema().clone());
120 RecordBatch {
121 schema,
122 df_record_batch,
123 }
124 }
125
126 pub fn new_with_count(schema: SchemaRef, num_rows: usize) -> Result<Self> {
128 let df_record_batch = DfRecordBatch::try_new_with_options(
129 schema.arrow_schema().clone(),
130 vec![],
131 &RecordBatchOptions::new().with_row_count(Some(num_rows)),
132 )
133 .context(error::NewDfRecordBatchSnafu)?;
134 Ok(RecordBatch {
135 schema,
136 df_record_batch,
137 })
138 }
139
140 pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
141 let schema = Arc::new(self.schema.try_project(indices).context(DataTypesSnafu)?);
142 let df_record_batch = self.df_record_batch.project(indices).with_context(|_| {
143 ProjectArrowRecordBatchSnafu {
144 schema: self.schema.clone(),
145 projection: indices.to_vec(),
146 }
147 })?;
148
149 Ok(Self {
150 schema,
151 df_record_batch,
152 })
153 }
154
155 pub fn from_df_record_batch(schema: SchemaRef, df_record_batch: DfRecordBatch) -> RecordBatch {
159 RecordBatch {
160 schema,
161 df_record_batch,
162 }
163 }
164
165 #[inline]
166 pub fn df_record_batch(&self) -> &DfRecordBatch {
167 &self.df_record_batch
168 }
169
170 #[inline]
171 pub fn into_df_record_batch(self) -> DfRecordBatch {
172 self.df_record_batch
173 }
174
175 #[inline]
176 pub fn columns(&self) -> &[ArrayRef] {
177 self.df_record_batch.columns()
178 }
179
180 #[inline]
181 pub fn column(&self, idx: usize) -> &ArrayRef {
182 self.df_record_batch.column(idx)
183 }
184
185 pub fn column_by_name(&self, name: &str) -> Option<&ArrayRef> {
186 self.df_record_batch.column_by_name(name)
187 }
188
189 #[inline]
190 pub fn num_columns(&self) -> usize {
191 self.df_record_batch.num_columns()
192 }
193
194 #[inline]
195 pub fn num_rows(&self) -> usize {
196 self.df_record_batch.num_rows()
197 }
198
199 pub fn column_vectors(
200 &self,
201 table_name: &str,
202 table_schema: SchemaRef,
203 ) -> Result<HashMap<String, VectorRef>> {
204 let mut vectors = HashMap::with_capacity(self.num_columns());
205
206 for (field, array) in self
208 .df_record_batch
209 .schema()
210 .fields()
211 .iter()
212 .zip(self.df_record_batch.columns().iter())
213 {
214 let column_name = field.name();
215 let column_schema =
216 table_schema
217 .column_schema_by_name(column_name)
218 .context(ColumnNotExistsSnafu {
219 table_name,
220 column_name,
221 })?;
222 let vector = if field.data_type() != &column_schema.data_type.as_arrow_type() {
223 let array = compute::cast(array, &column_schema.data_type.as_arrow_type())
224 .context(ArrowComputeSnafu)?;
225 Helper::try_into_vector(array).context(DataTypesSnafu)?
226 } else {
227 Helper::try_into_vector(array).context(DataTypesSnafu)?
228 };
229
230 let _ = vectors.insert(column_name.clone(), vector);
231 }
232
233 Ok(vectors)
234 }
235
236 pub fn pretty_print(&self) -> String {
238 pretty_format_batches(slice::from_ref(&self.df_record_batch))
239 .map(|t| t.to_string())
240 .unwrap_or("failed to pretty display a record batch".to_string())
241 }
242
243 pub fn slice(&self, offset: usize, len: usize) -> Result<RecordBatch> {
245 ensure!(
246 offset + len <= self.num_rows(),
247 error::RecordBatchSliceIndexOverflowSnafu {
248 size: self.num_rows(),
249 visit_index: offset + len
250 }
251 );
252 let sliced = self.df_record_batch.slice(offset, len);
253 Ok(RecordBatch::from_df_record_batch(
254 self.schema.clone(),
255 sliced,
256 ))
257 }
258
259 pub fn buffer_memory_size(&self) -> usize {
265 self.df_record_batch
266 .columns()
267 .iter()
268 .map(|array| array.get_buffer_memory_size())
269 .sum()
270 }
271
272 pub fn iter_column_as_string(&self, i: usize) -> Box<dyn Iterator<Item = Option<String>> + '_> {
280 macro_rules! iter {
281 ($column: ident) => {
282 Box::new(
283 (0..$column.len())
284 .map(|i| $column.is_valid(i).then(|| $column.value(i).to_string())),
285 )
286 };
287 }
288
289 let column = self.df_record_batch.column(i);
290 match column.data_type() {
291 ArrowDataType::Utf8 => {
292 let column = column.as_string::<i32>();
293 let iter = iter!(column);
294 iter as _
295 }
296 ArrowDataType::LargeUtf8 => {
297 let column = column.as_string::<i64>();
298 iter!(column)
299 }
300 ArrowDataType::Utf8View => {
301 let column = column.as_string_view();
302 iter!(column)
303 }
304 _ => {
305 if let Ok(column) = Helper::try_into_vector(column) {
306 Box::new(
307 (0..column.len())
308 .map(move |i| (!column.is_null(i)).then(|| column.get(i).to_string())),
309 )
310 } else {
311 Box::new(std::iter::empty())
312 }
313 }
314 }
315 }
316}
317
318impl Serialize for RecordBatch {
319 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
320 where
321 S: Serializer,
322 {
323 let mut s = serializer.serialize_struct("record", 2)?;
326 s.serialize_field("schema", &**self.schema.arrow_schema())?;
327
328 let columns = self.df_record_batch.columns();
329 let columns = Helper::try_into_vectors(columns).map_err(Error::custom)?;
330 let vec = columns
331 .iter()
332 .map(|c| c.serialize_to_json())
333 .collect::<std::result::Result<Vec<_>, _>>()
334 .map_err(S::Error::custom)?;
335
336 s.serialize_field("columns", &vec)?;
337 s.end()
338 }
339}
340
341pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Result<RecordBatch> {
343 let batches_len = batches.len();
344 if batches_len == 0 {
345 return Ok(RecordBatch::new_empty(schema));
346 }
347
348 let record_batch = compute::concat_batches(
349 schema.arrow_schema(),
350 batches.iter().map(|x| x.df_record_batch()),
351 )
352 .context(ArrowComputeSnafu)?;
353
354 Ok(RecordBatch::from_df_record_batch(schema, record_batch))
356}
357
358fn maybe_align_json_array_with_schema(
359 schema: &ArrowSchemaRef,
360 arrays: Vec<ArrayRef>,
361) -> Result<Vec<ArrayRef>> {
362 if schema.fields().iter().all(|f| !is_json_extension_type(f)) {
363 return Ok(arrays);
364 }
365
366 let mut aligned = Vec::with_capacity(arrays.len());
367 for (field, array) in schema.fields().iter().zip(arrays) {
368 if !is_json_extension_type(field) {
369 aligned.push(array);
370 continue;
371 }
372
373 let json_array = JsonArray::from(&array)
374 .try_align(field.data_type())
375 .context(DataTypesSnafu)?;
376 aligned.push(json_array);
377 }
378 Ok(aligned)
379}
380
381#[cfg(test)]
382mod tests {
383 use std::sync::Arc;
384
385 use datatypes::arrow::array::{AsArray, UInt32Array};
386 use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, UInt32Type};
387 use datatypes::arrow_array::StringArray;
388 use datatypes::data_type::ConcreteDataType;
389 use datatypes::schema::{ColumnSchema, Schema};
390 use datatypes::vectors::{StringVector, UInt32Vector};
391
392 use super::*;
393
394 #[test]
395 fn test_record_batch() {
396 let arrow_schema = Arc::new(ArrowSchema::new(vec![
397 Field::new("c1", DataType::UInt32, false),
398 Field::new("c2", DataType::UInt32, false),
399 ]));
400 let schema = Arc::new(Schema::try_from(arrow_schema).unwrap());
401
402 let c1 = Arc::new(UInt32Vector::from_slice([1, 2, 3]));
403 let c2 = Arc::new(UInt32Vector::from_slice([4, 5, 6]));
404 let columns: Vec<VectorRef> = vec![c1, c2];
405
406 let expected = vec![
407 Arc::new(UInt32Array::from_iter_values([1, 2, 3])) as ArrayRef,
408 Arc::new(UInt32Array::from_iter_values([4, 5, 6])),
409 ];
410
411 let batch = RecordBatch::new(schema.clone(), columns.clone()).unwrap();
412 assert_eq!(3, batch.num_rows());
413 assert_eq!(expected, batch.df_record_batch().columns());
414 assert_eq!(schema, batch.schema);
415
416 assert_eq!(&expected[0], batch.column_by_name("c1").unwrap());
417 assert_eq!(&expected[1], batch.column_by_name("c2").unwrap());
418 assert!(batch.column_by_name("c3").is_none());
419
420 let converted = RecordBatch::from_df_record_batch(schema, batch.df_record_batch().clone());
421 assert_eq!(batch, converted);
422 assert_eq!(*batch.df_record_batch(), converted.into_df_record_batch());
423 }
424
425 #[test]
426 pub fn test_serialize_recordbatch() {
427 let column_schemas = vec![ColumnSchema::new(
428 "number",
429 ConcreteDataType::uint32_datatype(),
430 false,
431 )];
432 let schema = Arc::new(Schema::try_new(column_schemas).unwrap());
433
434 let numbers: Vec<u32> = (0..10).collect();
435 let columns = vec![Arc::new(UInt32Vector::from_slice(numbers)) as VectorRef];
436 let batch = RecordBatch::new(schema, columns).unwrap();
437
438 let output = serde_json::to_string(&batch).unwrap();
439 assert_eq!(
440 r#"{"schema":{"fields":[{"name":"number","data_type":"UInt32","nullable":false,"dict_id":0,"dict_is_ordered":false,"metadata":{}}],"metadata":{"greptime:version":"0"}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}"#,
441 output
442 );
443 }
444
445 #[test]
446 fn test_record_batch_slice() {
447 let column_schemas = vec![
448 ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
449 ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
450 ];
451 let schema = Arc::new(Schema::new(column_schemas));
452 let columns: Vec<VectorRef> = vec![
453 Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
454 Arc::new(StringVector::from(vec![
455 None,
456 Some("hello"),
457 Some("greptime"),
458 None,
459 ])),
460 ];
461 let recordbatch = RecordBatch::new(schema, columns).unwrap();
462 let recordbatch = recordbatch.slice(1, 2).expect("recordbatch slice");
463
464 let expected = &UInt32Array::from_iter_values([2u32, 3]);
465 let array = recordbatch.column(0);
466 let actual = array.as_primitive::<UInt32Type>();
467 assert_eq!(expected, actual);
468
469 let expected = &StringArray::from(vec!["hello", "greptime"]);
470 let array = recordbatch.column(1);
471 let actual = array.as_string::<i32>();
472 assert_eq!(expected, actual);
473
474 assert!(recordbatch.slice(1, 5).is_err());
475 }
476
477 #[test]
478 fn test_merge_record_batch() {
479 let column_schemas = vec![
480 ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
481 ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
482 ];
483 let schema = Arc::new(Schema::new(column_schemas));
484 let columns: Vec<VectorRef> = vec![
485 Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
486 Arc::new(StringVector::from(vec![
487 None,
488 Some("hello"),
489 Some("greptime"),
490 None,
491 ])),
492 ];
493 let recordbatch = RecordBatch::new(schema.clone(), columns).unwrap();
494
495 let columns: Vec<VectorRef> = vec![
496 Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
497 Arc::new(StringVector::from(vec![
498 None,
499 Some("hello"),
500 Some("greptime"),
501 None,
502 ])),
503 ];
504 let recordbatch2 = RecordBatch::new(schema.clone(), columns).unwrap();
505
506 let merged = merge_record_batches(schema.clone(), &[recordbatch, recordbatch2])
507 .expect("merge recordbatch");
508 assert_eq!(merged.num_rows(), 8);
509 }
510}