1use std::cmp::Ordering;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use api::v1::SemanticType;
22use common_error::ext::BoxedError;
23use common_recordbatch::RecordBatch;
24use common_recordbatch::error::ExternalSnafu;
25use datatypes::prelude::{ConcreteDataType, DataType};
26use datatypes::schema::{Schema, SchemaRef};
27use datatypes::value::Value;
28use datatypes::vectors::VectorRef;
29use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
30use snafu::{OptionExt, ResultExt};
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::ColumnId;
33
34use crate::cache::CacheStrategy;
35use crate::error::{InvalidRequestSnafu, Result};
36use crate::read::Batch;
37use crate::read::flat_projection::FlatProjectionMapper;
38
39const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
41
42pub enum ProjectionMapper {
44 PrimaryKey(PrimaryKeyProjectionMapper),
46 Flat(FlatProjectionMapper),
48}
49
50impl ProjectionMapper {
51 pub fn new(
53 metadata: &RegionMetadataRef,
54 projection: impl Iterator<Item = usize> + Clone,
55 flat_format: bool,
56 ) -> Result<Self> {
57 if flat_format {
58 Ok(ProjectionMapper::Flat(FlatProjectionMapper::new(
59 metadata, projection,
60 )?))
61 } else {
62 Ok(ProjectionMapper::PrimaryKey(
63 PrimaryKeyProjectionMapper::new(metadata, projection)?,
64 ))
65 }
66 }
67
68 pub fn all(metadata: &RegionMetadataRef, flat_format: bool) -> Result<Self> {
70 if flat_format {
71 Ok(ProjectionMapper::Flat(FlatProjectionMapper::all(metadata)?))
72 } else {
73 Ok(ProjectionMapper::PrimaryKey(
74 PrimaryKeyProjectionMapper::all(metadata)?,
75 ))
76 }
77 }
78
79 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
81 match self {
82 ProjectionMapper::PrimaryKey(m) => m.metadata(),
83 ProjectionMapper::Flat(m) => m.metadata(),
84 }
85 }
86
87 pub(crate) fn has_tags(&self) -> bool {
89 match self {
90 ProjectionMapper::PrimaryKey(m) => m.has_tags(),
91 ProjectionMapper::Flat(_) => false,
92 }
93 }
94
95 pub(crate) fn column_ids(&self) -> &[ColumnId] {
98 match self {
99 ProjectionMapper::PrimaryKey(m) => m.column_ids(),
100 ProjectionMapper::Flat(m) => m.column_ids(),
101 }
102 }
103
104 pub(crate) fn output_schema(&self) -> SchemaRef {
106 match self {
107 ProjectionMapper::PrimaryKey(m) => m.output_schema(),
108 ProjectionMapper::Flat(m) => m.output_schema(),
109 }
110 }
111
112 pub fn as_primary_key(&self) -> Option<&PrimaryKeyProjectionMapper> {
114 match self {
115 ProjectionMapper::PrimaryKey(m) => Some(m),
116 ProjectionMapper::Flat(_) => None,
117 }
118 }
119
120 pub fn as_flat(&self) -> Option<&FlatProjectionMapper> {
122 match self {
123 ProjectionMapper::PrimaryKey(_) => None,
124 ProjectionMapper::Flat(m) => Some(m),
125 }
126 }
127
128 pub fn empty_record_batch(&self) -> RecordBatch {
131 match self {
132 ProjectionMapper::PrimaryKey(m) => m.empty_record_batch(),
133 ProjectionMapper::Flat(m) => m.empty_record_batch(),
134 }
135 }
136}
137
138pub struct PrimaryKeyProjectionMapper {
140 metadata: RegionMetadataRef,
142 batch_indices: Vec<BatchIndex>,
144 has_tags: bool,
146 codec: Arc<dyn PrimaryKeyCodec>,
148 output_schema: SchemaRef,
150 column_ids: Vec<ColumnId>,
153 batch_fields: Vec<(ColumnId, ConcreteDataType)>,
155 is_empty_projection: bool,
157}
158
159impl PrimaryKeyProjectionMapper {
160 pub fn new(
165 metadata: &RegionMetadataRef,
166 projection: impl Iterator<Item = usize>,
167 ) -> Result<PrimaryKeyProjectionMapper> {
168 let mut projection: Vec<_> = projection.collect();
169 let is_empty_projection = projection.is_empty();
171 if is_empty_projection {
172 projection.push(metadata.time_index_column_pos());
174 }
175
176 let mut column_schemas = Vec::with_capacity(projection.len());
177 let mut column_ids = Vec::with_capacity(projection.len());
178 for idx in &projection {
179 let column = metadata
181 .column_metadatas
182 .get(*idx)
183 .context(InvalidRequestSnafu {
184 region_id: metadata.region_id,
185 reason: format!("projection index {} is out of bound", idx),
186 })?;
187
188 column_ids.push(column.column_id);
189 column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
191 }
192
193 let codec = build_primary_key_codec(metadata);
194 if is_empty_projection {
195 return Ok(PrimaryKeyProjectionMapper {
197 metadata: metadata.clone(),
198 batch_indices: vec![],
199 has_tags: false,
200 codec,
201 output_schema: Arc::new(Schema::new(vec![])),
202 column_ids,
203 batch_fields: vec![],
204 is_empty_projection,
205 });
206 }
207
208 let output_schema = Arc::new(Schema::new(column_schemas));
210 let batch_fields = Batch::projected_fields(metadata, &column_ids);
212
213 let field_id_to_index: HashMap<_, _> = batch_fields
215 .iter()
216 .enumerate()
217 .map(|(index, (column_id, _))| (*column_id, index))
218 .collect();
219 let mut batch_indices = Vec::with_capacity(projection.len());
221 let mut has_tags = false;
222 for idx in &projection {
223 let column = &metadata.column_metadatas[*idx];
225 let batch_index = match column.semantic_type {
227 SemanticType::Tag => {
228 let index = metadata.primary_key_index(column.column_id).unwrap();
230 has_tags = true;
232 BatchIndex::Tag((index, column.column_id))
235 }
236 SemanticType::Timestamp => BatchIndex::Timestamp,
237 SemanticType::Field => {
238 let index = field_id_to_index[&column.column_id];
240 BatchIndex::Field(index)
241 }
242 };
243 batch_indices.push(batch_index);
244 }
245
246 Ok(PrimaryKeyProjectionMapper {
247 metadata: metadata.clone(),
248 batch_indices,
249 has_tags,
250 codec,
251 output_schema,
252 column_ids,
253 batch_fields,
254 is_empty_projection,
255 })
256 }
257
258 pub fn all(metadata: &RegionMetadataRef) -> Result<PrimaryKeyProjectionMapper> {
260 PrimaryKeyProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
261 }
262
263 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
265 &self.metadata
266 }
267
268 pub(crate) fn has_tags(&self) -> bool {
270 self.has_tags
271 }
272
273 pub(crate) fn column_ids(&self) -> &[ColumnId] {
276 &self.column_ids
277 }
278
279 pub(crate) fn batch_fields(&self) -> &[(ColumnId, ConcreteDataType)] {
281 &self.batch_fields
282 }
283
284 pub(crate) fn output_schema(&self) -> SchemaRef {
288 self.output_schema.clone()
289 }
290
291 pub(crate) fn empty_record_batch(&self) -> RecordBatch {
293 RecordBatch::new_empty(self.output_schema.clone())
294 }
295
296 pub(crate) fn convert(
300 &self,
301 batch: &Batch,
302 cache_strategy: &CacheStrategy,
303 ) -> common_recordbatch::error::Result<RecordBatch> {
304 if self.is_empty_projection {
305 return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
306 }
307
308 debug_assert_eq!(self.batch_fields.len(), batch.fields().len());
309 debug_assert!(
310 self.batch_fields
311 .iter()
312 .zip(batch.fields())
313 .all(|((id, _), batch_col)| *id == batch_col.column_id)
314 );
315
316 let pk_values = if self.has_tags {
318 match batch.pk_values() {
319 Some(v) => v.clone(),
320 None => self
321 .codec
322 .decode(batch.primary_key())
323 .map_err(BoxedError::new)
324 .context(ExternalSnafu)?,
325 }
326 } else {
327 CompositeValues::Dense(vec![])
328 };
329
330 let mut columns = Vec::with_capacity(self.output_schema.num_columns());
331 let num_rows = batch.num_rows();
332 for (index, column_schema) in self
333 .batch_indices
334 .iter()
335 .zip(self.output_schema.column_schemas())
336 {
337 match index {
338 BatchIndex::Tag((idx, column_id)) => {
339 let value = match &pk_values {
340 CompositeValues::Dense(v) => &v[*idx].1,
341 CompositeValues::Sparse(v) => v.get_or_null(*column_id),
342 };
343 let vector = repeated_vector_with_cache(
344 &column_schema.data_type,
345 value,
346 num_rows,
347 cache_strategy,
348 )?;
349 columns.push(vector);
350 }
351 BatchIndex::Timestamp => {
352 columns.push(batch.timestamps().clone());
353 }
354 BatchIndex::Field(idx) => {
355 columns.push(batch.fields()[*idx].data.clone());
356 }
357 }
358 }
359
360 RecordBatch::new(self.output_schema.clone(), columns)
361 }
362}
363
364#[derive(Debug, Clone, Copy)]
366enum BatchIndex {
367 Tag((usize, ColumnId)),
369 Timestamp,
371 Field(usize),
373}
374
375fn repeated_vector_with_cache(
377 data_type: &ConcreteDataType,
378 value: &Value,
379 num_rows: usize,
380 cache_strategy: &CacheStrategy,
381) -> common_recordbatch::error::Result<VectorRef> {
382 if let Some(vector) = cache_strategy.get_repeated_vector(data_type, value) {
383 match vector.len().cmp(&num_rows) {
386 Ordering::Less => (),
387 Ordering::Equal => return Ok(vector),
388 Ordering::Greater => return Ok(vector.slice(0, num_rows)),
389 }
390 }
391
392 let vector = new_repeated_vector(data_type, value, num_rows)?;
394 if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE {
396 cache_strategy.put_repeated_vector(value.clone(), vector.clone());
397 }
398
399 Ok(vector)
400}
401
402fn new_repeated_vector(
404 data_type: &ConcreteDataType,
405 value: &Value,
406 num_rows: usize,
407) -> common_recordbatch::error::Result<VectorRef> {
408 let mut mutable_vector = data_type.create_mutable_vector(1);
409 mutable_vector
410 .try_push_value_ref(&value.as_value_ref())
411 .map_err(BoxedError::new)
412 .context(ExternalSnafu)?;
413 let base_vector = mutable_vector.to_vector();
415 Ok(base_vector.replicate(&[num_rows]))
416}
417
418#[cfg(test)]
419mod tests {
420 use std::sync::Arc;
421
422 use api::v1::OpType;
423 use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt8Array, UInt64Array};
424 use datatypes::arrow::datatypes::Field;
425 use datatypes::arrow::util::pretty;
426 use datatypes::value::ValueRef;
427 use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
428 use mito_codec::test_util::TestRegionMetadataBuilder;
429 use store_api::storage::consts::{
430 OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
431 };
432
433 use super::*;
434 use crate::cache::CacheManager;
435 use crate::read::BatchBuilder;
436
437 fn new_batch(
438 ts_start: i64,
439 tags: &[i64],
440 fields: &[(ColumnId, i64)],
441 num_rows: usize,
442 ) -> Batch {
443 let converter = DensePrimaryKeyCodec::with_fields(
444 (0..tags.len())
445 .map(|idx| {
446 (
447 idx as u32,
448 SortField::new(ConcreteDataType::int64_datatype()),
449 )
450 })
451 .collect(),
452 );
453 let primary_key = converter
454 .encode(tags.iter().map(|v| ValueRef::Int64(*v)))
455 .unwrap();
456
457 let mut builder = BatchBuilder::new(primary_key);
458 builder
459 .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
460 (0..num_rows).map(|i| ts_start + i as i64 * 1000),
461 )))
462 .unwrap()
463 .sequences_array(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)))
464 .unwrap()
465 .op_types_array(Arc::new(UInt8Array::from_iter_values(
466 (0..num_rows).map(|_| OpType::Put as u8),
467 )))
468 .unwrap();
469 for (column_id, field) in fields {
470 builder
471 .push_field_array(
472 *column_id,
473 Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
474 *field, num_rows,
475 ))),
476 )
477 .unwrap();
478 }
479 builder.build().unwrap()
480 }
481
482 fn print_record_batch(record_batch: RecordBatch) -> String {
483 pretty::pretty_format_batches(&[record_batch.into_df_record_batch()])
484 .unwrap()
485 .to_string()
486 }
487
488 #[test]
489 fn test_projection_mapper_all() {
490 let metadata = Arc::new(
491 TestRegionMetadataBuilder::default()
492 .num_tags(2)
493 .num_fields(2)
494 .build(),
495 );
496 let mapper = ProjectionMapper::all(&metadata, false).unwrap();
498 assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
499 assert_eq!(
500 [
501 (3, ConcreteDataType::int64_datatype()),
502 (4, ConcreteDataType::int64_datatype())
503 ],
504 mapper.as_primary_key().unwrap().batch_fields()
505 );
506
507 let cache = CacheManager::builder().vector_cache_size(1024).build();
509 let cache = CacheStrategy::EnableAll(Arc::new(cache));
510 let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3);
511 let record_batch = mapper
512 .as_primary_key()
513 .unwrap()
514 .convert(&batch, &cache)
515 .unwrap();
516 let expect = "\
517+---------------------+----+----+----+----+
518| ts | k0 | k1 | v0 | v1 |
519+---------------------+----+----+----+----+
520| 1970-01-01T00:00:00 | 1 | 2 | 3 | 4 |
521| 1970-01-01T00:00:01 | 1 | 2 | 3 | 4 |
522| 1970-01-01T00:00:02 | 1 | 2 | 3 | 4 |
523+---------------------+----+----+----+----+";
524 assert_eq!(expect, print_record_batch(record_batch));
525
526 assert!(
527 cache
528 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(1))
529 .is_some()
530 );
531 assert!(
532 cache
533 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(2))
534 .is_some()
535 );
536 assert!(
537 cache
538 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3))
539 .is_none()
540 );
541 let record_batch = mapper
542 .as_primary_key()
543 .unwrap()
544 .convert(&batch, &cache)
545 .unwrap();
546 assert_eq!(expect, print_record_batch(record_batch));
547 }
548
549 #[test]
550 fn test_projection_mapper_with_projection() {
551 let metadata = Arc::new(
552 TestRegionMetadataBuilder::default()
553 .num_tags(2)
554 .num_fields(2)
555 .build(),
556 );
557 let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), false).unwrap();
559 assert_eq!([4, 1], mapper.column_ids());
560 assert_eq!(
561 [(4, ConcreteDataType::int64_datatype())],
562 mapper.as_primary_key().unwrap().batch_fields()
563 );
564
565 let batch = new_batch(0, &[1, 2], &[(4, 4)], 3);
566 let cache = CacheManager::builder().vector_cache_size(1024).build();
567 let cache = CacheStrategy::EnableAll(Arc::new(cache));
568 let record_batch = mapper
569 .as_primary_key()
570 .unwrap()
571 .convert(&batch, &cache)
572 .unwrap();
573 let expect = "\
574+----+----+
575| v1 | k0 |
576+----+----+
577| 4 | 1 |
578| 4 | 1 |
579| 4 | 1 |
580+----+----+";
581 assert_eq!(expect, print_record_batch(record_batch));
582 }
583
584 #[test]
585 fn test_projection_mapper_empty_projection() {
586 let metadata = Arc::new(
587 TestRegionMetadataBuilder::default()
588 .num_tags(2)
589 .num_fields(2)
590 .build(),
591 );
592 let mapper = ProjectionMapper::new(&metadata, [].into_iter(), false).unwrap();
594 assert_eq!([0], mapper.column_ids()); assert!(mapper.output_schema().is_empty());
596 let pk_mapper = mapper.as_primary_key().unwrap();
597 assert!(pk_mapper.batch_fields().is_empty());
598 assert!(!pk_mapper.has_tags);
599 assert!(pk_mapper.batch_indices.is_empty());
600 assert!(pk_mapper.is_empty_projection);
601
602 let batch = new_batch(0, &[1, 2], &[], 3);
603 let cache = CacheManager::builder().vector_cache_size(1024).build();
604 let cache = CacheStrategy::EnableAll(Arc::new(cache));
605 let record_batch = pk_mapper.convert(&batch, &cache).unwrap();
606 assert_eq!(3, record_batch.num_rows());
607 assert_eq!(0, record_batch.num_columns());
608 assert!(record_batch.schema.is_empty());
609 }
610
611 fn new_flat_batch(
612 ts_start: Option<i64>,
613 idx_tags: &[(usize, i64)],
614 idx_fields: &[(usize, i64)],
615 num_rows: usize,
616 ) -> datatypes::arrow::record_batch::RecordBatch {
617 let mut columns = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
618 let mut fields = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
619
620 for (i, tag) in idx_tags {
624 let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
625 *tag, num_rows,
626 ))) as _;
627 columns.push(array);
628 fields.push(Field::new(
629 format!("k{i}"),
630 datatypes::arrow::datatypes::DataType::Int64,
631 true,
632 ));
633 }
634
635 for (i, field) in idx_fields {
637 let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
638 *field, num_rows,
639 ))) as _;
640 columns.push(array);
641 fields.push(Field::new(
642 format!("v{i}"),
643 datatypes::arrow::datatypes::DataType::Int64,
644 true,
645 ));
646 }
647
648 if let Some(ts_start) = ts_start {
650 let timestamps = Arc::new(TimestampMillisecondArray::from_iter_values(
651 (0..num_rows).map(|i| ts_start + i as i64 * 1000),
652 )) as _;
653 columns.push(timestamps);
654 fields.push(Field::new(
655 "ts",
656 datatypes::arrow::datatypes::DataType::Timestamp(
657 datatypes::arrow::datatypes::TimeUnit::Millisecond,
658 None,
659 ),
660 true,
661 ));
662 }
663
664 let converter = DensePrimaryKeyCodec::with_fields(
667 (0..idx_tags.len())
668 .map(|idx| {
669 (
670 idx as u32,
671 SortField::new(ConcreteDataType::int64_datatype()),
672 )
673 })
674 .collect(),
675 );
676 let encoded_pk = converter
677 .encode(idx_tags.iter().map(|(_, v)| ValueRef::Int64(*v)))
678 .unwrap();
679
680 let pk_values: Vec<&[u8]> = std::iter::repeat_n(encoded_pk.as_slice(), num_rows).collect();
682 let keys = datatypes::arrow::array::UInt32Array::from_iter(0..num_rows as u32);
683 let values = Arc::new(datatypes::arrow::array::BinaryArray::from_vec(pk_values));
684 let pk_array =
685 Arc::new(datatypes::arrow::array::DictionaryArray::try_new(keys, values).unwrap()) as _;
686 columns.push(pk_array);
687 fields.push(Field::new_dictionary(
688 PRIMARY_KEY_COLUMN_NAME,
689 datatypes::arrow::datatypes::DataType::UInt32,
690 datatypes::arrow::datatypes::DataType::Binary,
691 false,
692 ));
693
694 columns.push(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)) as _);
696 fields.push(Field::new(
697 SEQUENCE_COLUMN_NAME,
698 datatypes::arrow::datatypes::DataType::UInt64,
699 false,
700 ));
701
702 columns.push(Arc::new(UInt8Array::from_iter_values(
704 (0..num_rows).map(|_| OpType::Put as u8),
705 )) as _);
706 fields.push(Field::new(
707 OP_TYPE_COLUMN_NAME,
708 datatypes::arrow::datatypes::DataType::UInt8,
709 false,
710 ));
711
712 let schema = Arc::new(datatypes::arrow::datatypes::Schema::new(fields));
713
714 datatypes::arrow::record_batch::RecordBatch::try_new(schema, columns).unwrap()
715 }
716
717 #[test]
718 fn test_flat_projection_mapper_all() {
719 let metadata = Arc::new(
720 TestRegionMetadataBuilder::default()
721 .num_tags(2)
722 .num_fields(2)
723 .build(),
724 );
725 let mapper = ProjectionMapper::all(&metadata, true).unwrap();
726 assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
727 assert_eq!(
728 [
729 (1, ConcreteDataType::int64_datatype()),
730 (2, ConcreteDataType::int64_datatype()),
731 (3, ConcreteDataType::int64_datatype()),
732 (4, ConcreteDataType::int64_datatype()),
733 (0, ConcreteDataType::timestamp_millisecond_datatype())
734 ],
735 mapper.as_flat().unwrap().batch_schema()
736 );
737
738 let batch = new_flat_batch(Some(0), &[(1, 1), (2, 2)], &[(3, 3), (4, 4)], 3);
739 let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
740 let expect = "\
741+---------------------+----+----+----+----+
742| ts | k0 | k1 | v0 | v1 |
743+---------------------+----+----+----+----+
744| 1970-01-01T00:00:00 | 1 | 2 | 3 | 4 |
745| 1970-01-01T00:00:01 | 1 | 2 | 3 | 4 |
746| 1970-01-01T00:00:02 | 1 | 2 | 3 | 4 |
747+---------------------+----+----+----+----+";
748 assert_eq!(expect, print_record_batch(record_batch));
749 }
750
751 #[test]
752 fn test_flat_projection_mapper_with_projection() {
753 let metadata = Arc::new(
754 TestRegionMetadataBuilder::default()
755 .num_tags(2)
756 .num_fields(2)
757 .build(),
758 );
759 let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), true).unwrap();
761 assert_eq!([4, 1], mapper.column_ids());
762 assert_eq!(
763 [
764 (1, ConcreteDataType::int64_datatype()),
765 (4, ConcreteDataType::int64_datatype()),
766 (0, ConcreteDataType::timestamp_millisecond_datatype())
767 ],
768 mapper.as_flat().unwrap().batch_schema()
769 );
770
771 let batch = new_flat_batch(None, &[(1, 1)], &[(4, 4)], 3);
772 let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
773 let expect = "\
774+----+----+
775| v1 | k0 |
776+----+----+
777| 4 | 1 |
778| 4 | 1 |
779| 4 | 1 |
780+----+----+";
781 assert_eq!(expect, print_record_batch(record_batch));
782 }
783
784 #[test]
785 fn test_flat_projection_mapper_empty_projection() {
786 let metadata = Arc::new(
787 TestRegionMetadataBuilder::default()
788 .num_tags(2)
789 .num_fields(2)
790 .build(),
791 );
792 let mapper = ProjectionMapper::new(&metadata, [].into_iter(), true).unwrap();
794 assert_eq!([0], mapper.column_ids()); assert!(mapper.output_schema().is_empty());
796 let flat_mapper = mapper.as_flat().unwrap();
797 assert!(flat_mapper.batch_schema().is_empty());
798
799 let batch = new_flat_batch(Some(0), &[], &[], 3);
800 let record_batch = flat_mapper.convert(&batch).unwrap();
801 assert_eq!(3, record_batch.num_rows());
802 assert_eq!(0, record_batch.num_columns());
803 assert!(record_batch.schema.is_empty());
804 }
805}