1use std::cmp::Ordering;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use api::v1::SemanticType;
22use common_error::ext::BoxedError;
23use common_recordbatch::RecordBatch;
24use common_recordbatch::error::ExternalSnafu;
25use datatypes::prelude::{ConcreteDataType, DataType};
26use datatypes::schema::{Schema, SchemaRef};
27use datatypes::value::Value;
28use datatypes::vectors::VectorRef;
29use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
30use snafu::{OptionExt, ResultExt};
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::ColumnId;
33
34use crate::cache::CacheStrategy;
35use crate::error::{InvalidRequestSnafu, Result};
36use crate::read::Batch;
37use crate::read::flat_projection::FlatProjectionMapper;
38
39const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
41
42pub enum ProjectionMapper {
44 PrimaryKey(PrimaryKeyProjectionMapper),
46 Flat(FlatProjectionMapper),
48}
49
50impl ProjectionMapper {
51 pub fn new(
53 metadata: &RegionMetadataRef,
54 projection: impl Iterator<Item = usize> + Clone,
55 flat_format: bool,
56 ) -> Result<Self> {
57 if flat_format {
58 Ok(ProjectionMapper::Flat(FlatProjectionMapper::new(
59 metadata, projection,
60 )?))
61 } else {
62 Ok(ProjectionMapper::PrimaryKey(
63 PrimaryKeyProjectionMapper::new(metadata, projection)?,
64 ))
65 }
66 }
67
68 pub fn all(metadata: &RegionMetadataRef, flat_format: bool) -> Result<Self> {
70 if flat_format {
71 Ok(ProjectionMapper::Flat(FlatProjectionMapper::all(metadata)?))
72 } else {
73 Ok(ProjectionMapper::PrimaryKey(
74 PrimaryKeyProjectionMapper::all(metadata)?,
75 ))
76 }
77 }
78
79 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
81 match self {
82 ProjectionMapper::PrimaryKey(m) => m.metadata(),
83 ProjectionMapper::Flat(m) => m.metadata(),
84 }
85 }
86
87 pub(crate) fn column_ids(&self) -> &[ColumnId] {
90 match self {
91 ProjectionMapper::PrimaryKey(m) => m.column_ids(),
92 ProjectionMapper::Flat(m) => m.column_ids(),
93 }
94 }
95
96 pub(crate) fn output_schema(&self) -> SchemaRef {
98 match self {
99 ProjectionMapper::PrimaryKey(m) => m.output_schema(),
100 ProjectionMapper::Flat(m) => m.output_schema(),
101 }
102 }
103
104 pub fn as_primary_key(&self) -> Option<&PrimaryKeyProjectionMapper> {
106 match self {
107 ProjectionMapper::PrimaryKey(m) => Some(m),
108 ProjectionMapper::Flat(_) => None,
109 }
110 }
111
112 pub fn as_flat(&self) -> Option<&FlatProjectionMapper> {
114 match self {
115 ProjectionMapper::PrimaryKey(_) => None,
116 ProjectionMapper::Flat(m) => Some(m),
117 }
118 }
119
120 pub fn empty_record_batch(&self) -> RecordBatch {
123 match self {
124 ProjectionMapper::PrimaryKey(m) => m.empty_record_batch(),
125 ProjectionMapper::Flat(m) => m.empty_record_batch(),
126 }
127 }
128}
129
130pub struct PrimaryKeyProjectionMapper {
132 metadata: RegionMetadataRef,
134 batch_indices: Vec<BatchIndex>,
136 has_tags: bool,
138 codec: Arc<dyn PrimaryKeyCodec>,
140 output_schema: SchemaRef,
142 column_ids: Vec<ColumnId>,
145 batch_fields: Vec<(ColumnId, ConcreteDataType)>,
147 is_empty_projection: bool,
149}
150
151impl PrimaryKeyProjectionMapper {
152 pub fn new(
157 metadata: &RegionMetadataRef,
158 projection: impl Iterator<Item = usize>,
159 ) -> Result<PrimaryKeyProjectionMapper> {
160 let mut projection: Vec<_> = projection.collect();
161 let is_empty_projection = projection.is_empty();
163 if is_empty_projection {
164 projection.push(metadata.time_index_column_pos());
166 }
167
168 let mut column_schemas = Vec::with_capacity(projection.len());
169 let mut column_ids = Vec::with_capacity(projection.len());
170 for idx in &projection {
171 let column = metadata
173 .column_metadatas
174 .get(*idx)
175 .context(InvalidRequestSnafu {
176 region_id: metadata.region_id,
177 reason: format!("projection index {} is out of bound", idx),
178 })?;
179
180 column_ids.push(column.column_id);
181 column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
183 }
184
185 let codec = build_primary_key_codec(metadata);
186 if is_empty_projection {
187 return Ok(PrimaryKeyProjectionMapper {
189 metadata: metadata.clone(),
190 batch_indices: vec![],
191 has_tags: false,
192 codec,
193 output_schema: Arc::new(Schema::new(vec![])),
194 column_ids,
195 batch_fields: vec![],
196 is_empty_projection,
197 });
198 }
199
200 let output_schema = Arc::new(Schema::new(column_schemas));
202 let batch_fields = Batch::projected_fields(metadata, &column_ids);
204
205 let field_id_to_index: HashMap<_, _> = batch_fields
207 .iter()
208 .enumerate()
209 .map(|(index, (column_id, _))| (*column_id, index))
210 .collect();
211 let mut batch_indices = Vec::with_capacity(projection.len());
213 let mut has_tags = false;
214 for idx in &projection {
215 let column = &metadata.column_metadatas[*idx];
217 let batch_index = match column.semantic_type {
219 SemanticType::Tag => {
220 let index = metadata.primary_key_index(column.column_id).unwrap();
222 has_tags = true;
224 BatchIndex::Tag((index, column.column_id))
227 }
228 SemanticType::Timestamp => BatchIndex::Timestamp,
229 SemanticType::Field => {
230 let index = field_id_to_index[&column.column_id];
232 BatchIndex::Field(index)
233 }
234 };
235 batch_indices.push(batch_index);
236 }
237
238 Ok(PrimaryKeyProjectionMapper {
239 metadata: metadata.clone(),
240 batch_indices,
241 has_tags,
242 codec,
243 output_schema,
244 column_ids,
245 batch_fields,
246 is_empty_projection,
247 })
248 }
249
250 pub fn all(metadata: &RegionMetadataRef) -> Result<PrimaryKeyProjectionMapper> {
252 PrimaryKeyProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
253 }
254
255 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
257 &self.metadata
258 }
259
260 pub(crate) fn column_ids(&self) -> &[ColumnId] {
263 &self.column_ids
264 }
265
266 pub(crate) fn batch_fields(&self) -> &[(ColumnId, ConcreteDataType)] {
268 &self.batch_fields
269 }
270
271 pub(crate) fn output_schema(&self) -> SchemaRef {
275 self.output_schema.clone()
276 }
277
278 pub(crate) fn empty_record_batch(&self) -> RecordBatch {
280 RecordBatch::new_empty(self.output_schema.clone())
281 }
282
283 pub(crate) fn convert(
287 &self,
288 batch: &Batch,
289 cache_strategy: &CacheStrategy,
290 ) -> common_recordbatch::error::Result<RecordBatch> {
291 if self.is_empty_projection {
292 return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
293 }
294
295 debug_assert_eq!(self.batch_fields.len(), batch.fields().len());
296 debug_assert!(
297 self.batch_fields
298 .iter()
299 .zip(batch.fields())
300 .all(|((id, _), batch_col)| *id == batch_col.column_id)
301 );
302
303 let pk_values = if self.has_tags {
305 match batch.pk_values() {
306 Some(v) => v.clone(),
307 None => self
308 .codec
309 .decode(batch.primary_key())
310 .map_err(BoxedError::new)
311 .context(ExternalSnafu)?,
312 }
313 } else {
314 CompositeValues::Dense(vec![])
315 };
316
317 let mut columns = Vec::with_capacity(self.output_schema.num_columns());
318 let num_rows = batch.num_rows();
319 for (index, column_schema) in self
320 .batch_indices
321 .iter()
322 .zip(self.output_schema.column_schemas())
323 {
324 match index {
325 BatchIndex::Tag((idx, column_id)) => {
326 let value = match &pk_values {
327 CompositeValues::Dense(v) => &v[*idx].1,
328 CompositeValues::Sparse(v) => v.get_or_null(*column_id),
329 };
330 let vector = repeated_vector_with_cache(
331 &column_schema.data_type,
332 value,
333 num_rows,
334 cache_strategy,
335 )?;
336 columns.push(vector);
337 }
338 BatchIndex::Timestamp => {
339 columns.push(batch.timestamps().clone());
340 }
341 BatchIndex::Field(idx) => {
342 columns.push(batch.fields()[*idx].data.clone());
343 }
344 }
345 }
346
347 RecordBatch::new(self.output_schema.clone(), columns)
348 }
349}
350
351#[derive(Debug, Clone, Copy)]
353enum BatchIndex {
354 Tag((usize, ColumnId)),
356 Timestamp,
358 Field(usize),
360}
361
362fn repeated_vector_with_cache(
364 data_type: &ConcreteDataType,
365 value: &Value,
366 num_rows: usize,
367 cache_strategy: &CacheStrategy,
368) -> common_recordbatch::error::Result<VectorRef> {
369 if let Some(vector) = cache_strategy.get_repeated_vector(data_type, value) {
370 match vector.len().cmp(&num_rows) {
373 Ordering::Less => (),
374 Ordering::Equal => return Ok(vector),
375 Ordering::Greater => return Ok(vector.slice(0, num_rows)),
376 }
377 }
378
379 let vector = new_repeated_vector(data_type, value, num_rows)?;
381 if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE {
383 cache_strategy.put_repeated_vector(value.clone(), vector.clone());
384 }
385
386 Ok(vector)
387}
388
389fn new_repeated_vector(
391 data_type: &ConcreteDataType,
392 value: &Value,
393 num_rows: usize,
394) -> common_recordbatch::error::Result<VectorRef> {
395 let mut mutable_vector = data_type.create_mutable_vector(1);
396 mutable_vector
397 .try_push_value_ref(&value.as_value_ref())
398 .map_err(BoxedError::new)
399 .context(ExternalSnafu)?;
400 let base_vector = mutable_vector.to_vector();
402 Ok(base_vector.replicate(&[num_rows]))
403}
404
405#[cfg(test)]
406mod tests {
407 use std::sync::Arc;
408
409 use api::v1::OpType;
410 use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt8Array, UInt64Array};
411 use datatypes::arrow::datatypes::Field;
412 use datatypes::arrow::util::pretty;
413 use datatypes::value::ValueRef;
414 use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
415 use mito_codec::test_util::TestRegionMetadataBuilder;
416 use store_api::storage::consts::{
417 OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
418 };
419
420 use super::*;
421 use crate::cache::CacheManager;
422 use crate::read::BatchBuilder;
423
424 fn new_batch(
425 ts_start: i64,
426 tags: &[i64],
427 fields: &[(ColumnId, i64)],
428 num_rows: usize,
429 ) -> Batch {
430 let converter = DensePrimaryKeyCodec::with_fields(
431 (0..tags.len())
432 .map(|idx| {
433 (
434 idx as u32,
435 SortField::new(ConcreteDataType::int64_datatype()),
436 )
437 })
438 .collect(),
439 );
440 let primary_key = converter
441 .encode(tags.iter().map(|v| ValueRef::Int64(*v)))
442 .unwrap();
443
444 let mut builder = BatchBuilder::new(primary_key);
445 builder
446 .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
447 (0..num_rows).map(|i| ts_start + i as i64 * 1000),
448 )))
449 .unwrap()
450 .sequences_array(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)))
451 .unwrap()
452 .op_types_array(Arc::new(UInt8Array::from_iter_values(
453 (0..num_rows).map(|_| OpType::Put as u8),
454 )))
455 .unwrap();
456 for (column_id, field) in fields {
457 builder
458 .push_field_array(
459 *column_id,
460 Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
461 *field, num_rows,
462 ))),
463 )
464 .unwrap();
465 }
466 builder.build().unwrap()
467 }
468
469 fn print_record_batch(record_batch: RecordBatch) -> String {
470 pretty::pretty_format_batches(&[record_batch.into_df_record_batch()])
471 .unwrap()
472 .to_string()
473 }
474
475 #[test]
476 fn test_projection_mapper_all() {
477 let metadata = Arc::new(
478 TestRegionMetadataBuilder::default()
479 .num_tags(2)
480 .num_fields(2)
481 .build(),
482 );
483 let mapper = ProjectionMapper::all(&metadata, false).unwrap();
485 assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
486 assert_eq!(
487 [
488 (3, ConcreteDataType::int64_datatype()),
489 (4, ConcreteDataType::int64_datatype())
490 ],
491 mapper.as_primary_key().unwrap().batch_fields()
492 );
493
494 let cache = CacheManager::builder().vector_cache_size(1024).build();
496 let cache = CacheStrategy::EnableAll(Arc::new(cache));
497 let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3);
498 let record_batch = mapper
499 .as_primary_key()
500 .unwrap()
501 .convert(&batch, &cache)
502 .unwrap();
503 let expect = "\
504+---------------------+----+----+----+----+
505| ts | k0 | k1 | v0 | v1 |
506+---------------------+----+----+----+----+
507| 1970-01-01T00:00:00 | 1 | 2 | 3 | 4 |
508| 1970-01-01T00:00:01 | 1 | 2 | 3 | 4 |
509| 1970-01-01T00:00:02 | 1 | 2 | 3 | 4 |
510+---------------------+----+----+----+----+";
511 assert_eq!(expect, print_record_batch(record_batch));
512
513 assert!(
514 cache
515 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(1))
516 .is_some()
517 );
518 assert!(
519 cache
520 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(2))
521 .is_some()
522 );
523 assert!(
524 cache
525 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3))
526 .is_none()
527 );
528 let record_batch = mapper
529 .as_primary_key()
530 .unwrap()
531 .convert(&batch, &cache)
532 .unwrap();
533 assert_eq!(expect, print_record_batch(record_batch));
534 }
535
536 #[test]
537 fn test_projection_mapper_with_projection() {
538 let metadata = Arc::new(
539 TestRegionMetadataBuilder::default()
540 .num_tags(2)
541 .num_fields(2)
542 .build(),
543 );
544 let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), false).unwrap();
546 assert_eq!([4, 1], mapper.column_ids());
547 assert_eq!(
548 [(4, ConcreteDataType::int64_datatype())],
549 mapper.as_primary_key().unwrap().batch_fields()
550 );
551
552 let batch = new_batch(0, &[1, 2], &[(4, 4)], 3);
553 let cache = CacheManager::builder().vector_cache_size(1024).build();
554 let cache = CacheStrategy::EnableAll(Arc::new(cache));
555 let record_batch = mapper
556 .as_primary_key()
557 .unwrap()
558 .convert(&batch, &cache)
559 .unwrap();
560 let expect = "\
561+----+----+
562| v1 | k0 |
563+----+----+
564| 4 | 1 |
565| 4 | 1 |
566| 4 | 1 |
567+----+----+";
568 assert_eq!(expect, print_record_batch(record_batch));
569 }
570
571 #[test]
572 fn test_projection_mapper_empty_projection() {
573 let metadata = Arc::new(
574 TestRegionMetadataBuilder::default()
575 .num_tags(2)
576 .num_fields(2)
577 .build(),
578 );
579 let mapper = ProjectionMapper::new(&metadata, [].into_iter(), false).unwrap();
581 assert_eq!([0], mapper.column_ids()); assert!(mapper.output_schema().is_empty());
583 let pk_mapper = mapper.as_primary_key().unwrap();
584 assert!(pk_mapper.batch_fields().is_empty());
585 assert!(!pk_mapper.has_tags);
586 assert!(pk_mapper.batch_indices.is_empty());
587 assert!(pk_mapper.is_empty_projection);
588
589 let batch = new_batch(0, &[1, 2], &[], 3);
590 let cache = CacheManager::builder().vector_cache_size(1024).build();
591 let cache = CacheStrategy::EnableAll(Arc::new(cache));
592 let record_batch = pk_mapper.convert(&batch, &cache).unwrap();
593 assert_eq!(3, record_batch.num_rows());
594 assert_eq!(0, record_batch.num_columns());
595 assert!(record_batch.schema.is_empty());
596 }
597
598 fn new_flat_batch(
599 ts_start: Option<i64>,
600 idx_tags: &[(usize, i64)],
601 idx_fields: &[(usize, i64)],
602 num_rows: usize,
603 ) -> datatypes::arrow::record_batch::RecordBatch {
604 let mut columns = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
605 let mut fields = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
606
607 for (i, tag) in idx_tags {
611 let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
612 *tag, num_rows,
613 ))) as _;
614 columns.push(array);
615 fields.push(Field::new(
616 format!("k{i}"),
617 datatypes::arrow::datatypes::DataType::Int64,
618 true,
619 ));
620 }
621
622 for (i, field) in idx_fields {
624 let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
625 *field, num_rows,
626 ))) as _;
627 columns.push(array);
628 fields.push(Field::new(
629 format!("v{i}"),
630 datatypes::arrow::datatypes::DataType::Int64,
631 true,
632 ));
633 }
634
635 if let Some(ts_start) = ts_start {
637 let timestamps = Arc::new(TimestampMillisecondArray::from_iter_values(
638 (0..num_rows).map(|i| ts_start + i as i64 * 1000),
639 )) as _;
640 columns.push(timestamps);
641 fields.push(Field::new(
642 "ts",
643 datatypes::arrow::datatypes::DataType::Timestamp(
644 datatypes::arrow::datatypes::TimeUnit::Millisecond,
645 None,
646 ),
647 true,
648 ));
649 }
650
651 let converter = DensePrimaryKeyCodec::with_fields(
654 (0..idx_tags.len())
655 .map(|idx| {
656 (
657 idx as u32,
658 SortField::new(ConcreteDataType::int64_datatype()),
659 )
660 })
661 .collect(),
662 );
663 let encoded_pk = converter
664 .encode(idx_tags.iter().map(|(_, v)| ValueRef::Int64(*v)))
665 .unwrap();
666
667 let pk_values: Vec<&[u8]> = std::iter::repeat_n(encoded_pk.as_slice(), num_rows).collect();
669 let keys = datatypes::arrow::array::UInt32Array::from_iter(0..num_rows as u32);
670 let values = Arc::new(datatypes::arrow::array::BinaryArray::from_vec(pk_values));
671 let pk_array =
672 Arc::new(datatypes::arrow::array::DictionaryArray::try_new(keys, values).unwrap()) as _;
673 columns.push(pk_array);
674 fields.push(Field::new_dictionary(
675 PRIMARY_KEY_COLUMN_NAME,
676 datatypes::arrow::datatypes::DataType::UInt32,
677 datatypes::arrow::datatypes::DataType::Binary,
678 false,
679 ));
680
681 columns.push(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)) as _);
683 fields.push(Field::new(
684 SEQUENCE_COLUMN_NAME,
685 datatypes::arrow::datatypes::DataType::UInt64,
686 false,
687 ));
688
689 columns.push(Arc::new(UInt8Array::from_iter_values(
691 (0..num_rows).map(|_| OpType::Put as u8),
692 )) as _);
693 fields.push(Field::new(
694 OP_TYPE_COLUMN_NAME,
695 datatypes::arrow::datatypes::DataType::UInt8,
696 false,
697 ));
698
699 let schema = Arc::new(datatypes::arrow::datatypes::Schema::new(fields));
700
701 datatypes::arrow::record_batch::RecordBatch::try_new(schema, columns).unwrap()
702 }
703
704 #[test]
705 fn test_flat_projection_mapper_all() {
706 let metadata = Arc::new(
707 TestRegionMetadataBuilder::default()
708 .num_tags(2)
709 .num_fields(2)
710 .build(),
711 );
712 let mapper = ProjectionMapper::all(&metadata, true).unwrap();
713 assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
714 assert_eq!(
715 [
716 (1, ConcreteDataType::int64_datatype()),
717 (2, ConcreteDataType::int64_datatype()),
718 (3, ConcreteDataType::int64_datatype()),
719 (4, ConcreteDataType::int64_datatype()),
720 (0, ConcreteDataType::timestamp_millisecond_datatype())
721 ],
722 mapper.as_flat().unwrap().batch_schema()
723 );
724
725 let batch = new_flat_batch(Some(0), &[(1, 1), (2, 2)], &[(3, 3), (4, 4)], 3);
726 let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
727 let expect = "\
728+---------------------+----+----+----+----+
729| ts | k0 | k1 | v0 | v1 |
730+---------------------+----+----+----+----+
731| 1970-01-01T00:00:00 | 1 | 2 | 3 | 4 |
732| 1970-01-01T00:00:01 | 1 | 2 | 3 | 4 |
733| 1970-01-01T00:00:02 | 1 | 2 | 3 | 4 |
734+---------------------+----+----+----+----+";
735 assert_eq!(expect, print_record_batch(record_batch));
736 }
737
738 #[test]
739 fn test_flat_projection_mapper_with_projection() {
740 let metadata = Arc::new(
741 TestRegionMetadataBuilder::default()
742 .num_tags(2)
743 .num_fields(2)
744 .build(),
745 );
746 let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), true).unwrap();
748 assert_eq!([4, 1], mapper.column_ids());
749 assert_eq!(
750 [
751 (1, ConcreteDataType::int64_datatype()),
752 (4, ConcreteDataType::int64_datatype()),
753 (0, ConcreteDataType::timestamp_millisecond_datatype())
754 ],
755 mapper.as_flat().unwrap().batch_schema()
756 );
757
758 let batch = new_flat_batch(None, &[(1, 1)], &[(4, 4)], 3);
759 let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
760 let expect = "\
761+----+----+
762| v1 | k0 |
763+----+----+
764| 4 | 1 |
765| 4 | 1 |
766| 4 | 1 |
767+----+----+";
768 assert_eq!(expect, print_record_batch(record_batch));
769 }
770
771 #[test]
772 fn test_flat_projection_mapper_empty_projection() {
773 let metadata = Arc::new(
774 TestRegionMetadataBuilder::default()
775 .num_tags(2)
776 .num_fields(2)
777 .build(),
778 );
779 let mapper = ProjectionMapper::new(&metadata, [].into_iter(), true).unwrap();
781 assert_eq!([0], mapper.column_ids()); assert!(mapper.output_schema().is_empty());
783 let flat_mapper = mapper.as_flat().unwrap();
784 assert!(flat_mapper.batch_schema().is_empty());
785
786 let batch = new_flat_batch(Some(0), &[], &[], 3);
787 let record_batch = flat_mapper.convert(&batch).unwrap();
788 assert_eq!(3, record_batch.num_rows());
789 assert_eq!(0, record_batch.num_columns());
790 assert!(record_batch.schema.is_empty());
791 }
792}