1use std::cmp::Ordering;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use api::v1::SemanticType;
22use common_error::ext::BoxedError;
23use common_recordbatch::error::ExternalSnafu;
24use common_recordbatch::RecordBatch;
25use datatypes::prelude::{ConcreteDataType, DataType};
26use datatypes::schema::{Schema, SchemaRef};
27use datatypes::value::Value;
28use datatypes::vectors::VectorRef;
29use mito_codec::row_converter::{build_primary_key_codec, CompositeValues, PrimaryKeyCodec};
30use snafu::{OptionExt, ResultExt};
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::ColumnId;
33
34use crate::cache::CacheStrategy;
35use crate::error::{InvalidRequestSnafu, Result};
36use crate::read::flat_projection::FlatProjectionMapper;
37use crate::read::Batch;
38
39const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
41
42pub enum ProjectionMapper {
44 PrimaryKey(PrimaryKeyProjectionMapper),
46 Flat(FlatProjectionMapper),
48}
49
50impl ProjectionMapper {
51 pub fn new(
53 metadata: &RegionMetadataRef,
54 projection: impl Iterator<Item = usize> + Clone,
55 flat_format: bool,
56 ) -> Result<Self> {
57 if flat_format {
58 Ok(ProjectionMapper::Flat(FlatProjectionMapper::new(
59 metadata, projection,
60 )?))
61 } else {
62 Ok(ProjectionMapper::PrimaryKey(
63 PrimaryKeyProjectionMapper::new(metadata, projection)?,
64 ))
65 }
66 }
67
68 pub fn all(metadata: &RegionMetadataRef, flat_format: bool) -> Result<Self> {
70 if flat_format {
71 Ok(ProjectionMapper::Flat(FlatProjectionMapper::all(metadata)?))
72 } else {
73 Ok(ProjectionMapper::PrimaryKey(
74 PrimaryKeyProjectionMapper::all(metadata)?,
75 ))
76 }
77 }
78
79 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
81 match self {
82 ProjectionMapper::PrimaryKey(m) => m.metadata(),
83 ProjectionMapper::Flat(m) => m.metadata(),
84 }
85 }
86
87 pub(crate) fn column_ids(&self) -> &[ColumnId] {
90 match self {
91 ProjectionMapper::PrimaryKey(m) => m.column_ids(),
92 ProjectionMapper::Flat(m) => m.column_ids(),
93 }
94 }
95
96 pub(crate) fn output_schema(&self) -> SchemaRef {
98 match self {
99 ProjectionMapper::PrimaryKey(m) => m.output_schema(),
100 ProjectionMapper::Flat(m) => m.output_schema(),
101 }
102 }
103
104 pub fn as_primary_key(&self) -> Option<&PrimaryKeyProjectionMapper> {
106 match self {
107 ProjectionMapper::PrimaryKey(m) => Some(m),
108 ProjectionMapper::Flat(_) => None,
109 }
110 }
111
112 pub fn as_flat(&self) -> Option<&FlatProjectionMapper> {
114 match self {
115 ProjectionMapper::PrimaryKey(_) => None,
116 ProjectionMapper::Flat(m) => Some(m),
117 }
118 }
119
120 pub fn empty_record_batch(&self) -> RecordBatch {
123 match self {
124 ProjectionMapper::PrimaryKey(m) => m.empty_record_batch(),
125 ProjectionMapper::Flat(m) => m.empty_record_batch(),
126 }
127 }
128}
129
130pub struct PrimaryKeyProjectionMapper {
132 metadata: RegionMetadataRef,
134 batch_indices: Vec<BatchIndex>,
136 has_tags: bool,
138 codec: Arc<dyn PrimaryKeyCodec>,
140 output_schema: SchemaRef,
142 column_ids: Vec<ColumnId>,
145 batch_fields: Vec<(ColumnId, ConcreteDataType)>,
147 is_empty_projection: bool,
149}
150
151impl PrimaryKeyProjectionMapper {
152 pub fn new(
157 metadata: &RegionMetadataRef,
158 projection: impl Iterator<Item = usize>,
159 ) -> Result<PrimaryKeyProjectionMapper> {
160 let mut projection: Vec<_> = projection.collect();
161 let is_empty_projection = projection.is_empty();
163 if is_empty_projection {
164 projection.push(metadata.time_index_column_pos());
166 }
167
168 let mut column_schemas = Vec::with_capacity(projection.len());
169 let mut column_ids = Vec::with_capacity(projection.len());
170 for idx in &projection {
171 let column = metadata
173 .column_metadatas
174 .get(*idx)
175 .context(InvalidRequestSnafu {
176 region_id: metadata.region_id,
177 reason: format!("projection index {} is out of bound", idx),
178 })?;
179
180 column_ids.push(column.column_id);
181 column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
183 }
184
185 let codec = build_primary_key_codec(metadata);
186 if is_empty_projection {
187 return Ok(PrimaryKeyProjectionMapper {
189 metadata: metadata.clone(),
190 batch_indices: vec![],
191 has_tags: false,
192 codec,
193 output_schema: Arc::new(Schema::new(vec![])),
194 column_ids,
195 batch_fields: vec![],
196 is_empty_projection,
197 });
198 }
199
200 let output_schema = Arc::new(Schema::new(column_schemas));
202 let batch_fields = Batch::projected_fields(metadata, &column_ids);
204
205 let field_id_to_index: HashMap<_, _> = batch_fields
207 .iter()
208 .enumerate()
209 .map(|(index, (column_id, _))| (*column_id, index))
210 .collect();
211 let mut batch_indices = Vec::with_capacity(projection.len());
213 let mut has_tags = false;
214 for idx in &projection {
215 let column = &metadata.column_metadatas[*idx];
217 let batch_index = match column.semantic_type {
219 SemanticType::Tag => {
220 let index = metadata.primary_key_index(column.column_id).unwrap();
222 has_tags = true;
224 BatchIndex::Tag((index, column.column_id))
227 }
228 SemanticType::Timestamp => BatchIndex::Timestamp,
229 SemanticType::Field => {
230 let index = field_id_to_index[&column.column_id];
232 BatchIndex::Field(index)
233 }
234 };
235 batch_indices.push(batch_index);
236 }
237
238 Ok(PrimaryKeyProjectionMapper {
239 metadata: metadata.clone(),
240 batch_indices,
241 has_tags,
242 codec,
243 output_schema,
244 column_ids,
245 batch_fields,
246 is_empty_projection,
247 })
248 }
249
250 pub fn all(metadata: &RegionMetadataRef) -> Result<PrimaryKeyProjectionMapper> {
252 PrimaryKeyProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
253 }
254
255 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
257 &self.metadata
258 }
259
260 pub(crate) fn column_ids(&self) -> &[ColumnId] {
263 &self.column_ids
264 }
265
266 pub(crate) fn batch_fields(&self) -> &[(ColumnId, ConcreteDataType)] {
268 &self.batch_fields
269 }
270
271 pub(crate) fn output_schema(&self) -> SchemaRef {
275 self.output_schema.clone()
276 }
277
278 pub(crate) fn empty_record_batch(&self) -> RecordBatch {
280 RecordBatch::new_empty(self.output_schema.clone())
281 }
282
283 pub(crate) fn convert(
287 &self,
288 batch: &Batch,
289 cache_strategy: &CacheStrategy,
290 ) -> common_recordbatch::error::Result<RecordBatch> {
291 if self.is_empty_projection {
292 return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
293 }
294
295 debug_assert_eq!(self.batch_fields.len(), batch.fields().len());
296 debug_assert!(self
297 .batch_fields
298 .iter()
299 .zip(batch.fields())
300 .all(|((id, _), batch_col)| *id == batch_col.column_id));
301
302 let pk_values = if self.has_tags {
304 match batch.pk_values() {
305 Some(v) => v.clone(),
306 None => self
307 .codec
308 .decode(batch.primary_key())
309 .map_err(BoxedError::new)
310 .context(ExternalSnafu)?,
311 }
312 } else {
313 CompositeValues::Dense(vec![])
314 };
315
316 let mut columns = Vec::with_capacity(self.output_schema.num_columns());
317 let num_rows = batch.num_rows();
318 for (index, column_schema) in self
319 .batch_indices
320 .iter()
321 .zip(self.output_schema.column_schemas())
322 {
323 match index {
324 BatchIndex::Tag((idx, column_id)) => {
325 let value = match &pk_values {
326 CompositeValues::Dense(v) => &v[*idx].1,
327 CompositeValues::Sparse(v) => v.get_or_null(*column_id),
328 };
329 let vector = repeated_vector_with_cache(
330 &column_schema.data_type,
331 value,
332 num_rows,
333 cache_strategy,
334 )?;
335 columns.push(vector);
336 }
337 BatchIndex::Timestamp => {
338 columns.push(batch.timestamps().clone());
339 }
340 BatchIndex::Field(idx) => {
341 columns.push(batch.fields()[*idx].data.clone());
342 }
343 }
344 }
345
346 RecordBatch::new(self.output_schema.clone(), columns)
347 }
348}
349
350#[derive(Debug, Clone, Copy)]
352enum BatchIndex {
353 Tag((usize, ColumnId)),
355 Timestamp,
357 Field(usize),
359}
360
361fn repeated_vector_with_cache(
363 data_type: &ConcreteDataType,
364 value: &Value,
365 num_rows: usize,
366 cache_strategy: &CacheStrategy,
367) -> common_recordbatch::error::Result<VectorRef> {
368 if let Some(vector) = cache_strategy.get_repeated_vector(data_type, value) {
369 match vector.len().cmp(&num_rows) {
372 Ordering::Less => (),
373 Ordering::Equal => return Ok(vector),
374 Ordering::Greater => return Ok(vector.slice(0, num_rows)),
375 }
376 }
377
378 let vector = new_repeated_vector(data_type, value, num_rows)?;
380 if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE {
382 cache_strategy.put_repeated_vector(value.clone(), vector.clone());
383 }
384
385 Ok(vector)
386}
387
388fn new_repeated_vector(
390 data_type: &ConcreteDataType,
391 value: &Value,
392 num_rows: usize,
393) -> common_recordbatch::error::Result<VectorRef> {
394 let mut mutable_vector = data_type.create_mutable_vector(1);
395 mutable_vector
396 .try_push_value_ref(value.as_value_ref())
397 .map_err(BoxedError::new)
398 .context(ExternalSnafu)?;
399 let base_vector = mutable_vector.to_vector();
401 Ok(base_vector.replicate(&[num_rows]))
402}
403
404#[cfg(test)]
405mod tests {
406 use std::sync::Arc;
407
408 use api::v1::OpType;
409 use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array};
410 use datatypes::arrow::datatypes::Field;
411 use datatypes::arrow::util::pretty;
412 use datatypes::value::ValueRef;
413 use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
414 use mito_codec::test_util::TestRegionMetadataBuilder;
415 use store_api::storage::consts::{
416 OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
417 };
418
419 use super::*;
420 use crate::cache::CacheManager;
421 use crate::read::BatchBuilder;
422
423 fn new_batch(
424 ts_start: i64,
425 tags: &[i64],
426 fields: &[(ColumnId, i64)],
427 num_rows: usize,
428 ) -> Batch {
429 let converter = DensePrimaryKeyCodec::with_fields(
430 (0..tags.len())
431 .map(|idx| {
432 (
433 idx as u32,
434 SortField::new(ConcreteDataType::int64_datatype()),
435 )
436 })
437 .collect(),
438 );
439 let primary_key = converter
440 .encode(tags.iter().map(|v| ValueRef::Int64(*v)))
441 .unwrap();
442
443 let mut builder = BatchBuilder::new(primary_key);
444 builder
445 .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
446 (0..num_rows).map(|i| ts_start + i as i64 * 1000),
447 )))
448 .unwrap()
449 .sequences_array(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)))
450 .unwrap()
451 .op_types_array(Arc::new(UInt8Array::from_iter_values(
452 (0..num_rows).map(|_| OpType::Put as u8),
453 )))
454 .unwrap();
455 for (column_id, field) in fields {
456 builder
457 .push_field_array(
458 *column_id,
459 Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
460 *field, num_rows,
461 ))),
462 )
463 .unwrap();
464 }
465 builder.build().unwrap()
466 }
467
468 fn print_record_batch(record_batch: RecordBatch) -> String {
469 pretty::pretty_format_batches(&[record_batch.into_df_record_batch()])
470 .unwrap()
471 .to_string()
472 }
473
474 #[test]
475 fn test_projection_mapper_all() {
476 let metadata = Arc::new(
477 TestRegionMetadataBuilder::default()
478 .num_tags(2)
479 .num_fields(2)
480 .build(),
481 );
482 let mapper = ProjectionMapper::all(&metadata, false).unwrap();
484 assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
485 assert_eq!(
486 [
487 (3, ConcreteDataType::int64_datatype()),
488 (4, ConcreteDataType::int64_datatype())
489 ],
490 mapper.as_primary_key().unwrap().batch_fields()
491 );
492
493 let cache = CacheManager::builder().vector_cache_size(1024).build();
495 let cache = CacheStrategy::EnableAll(Arc::new(cache));
496 let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3);
497 let record_batch = mapper
498 .as_primary_key()
499 .unwrap()
500 .convert(&batch, &cache)
501 .unwrap();
502 let expect = "\
503+---------------------+----+----+----+----+
504| ts | k0 | k1 | v0 | v1 |
505+---------------------+----+----+----+----+
506| 1970-01-01T00:00:00 | 1 | 2 | 3 | 4 |
507| 1970-01-01T00:00:01 | 1 | 2 | 3 | 4 |
508| 1970-01-01T00:00:02 | 1 | 2 | 3 | 4 |
509+---------------------+----+----+----+----+";
510 assert_eq!(expect, print_record_batch(record_batch));
511
512 assert!(cache
513 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(1))
514 .is_some());
515 assert!(cache
516 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(2))
517 .is_some());
518 assert!(cache
519 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3))
520 .is_none());
521 let record_batch = mapper
522 .as_primary_key()
523 .unwrap()
524 .convert(&batch, &cache)
525 .unwrap();
526 assert_eq!(expect, print_record_batch(record_batch));
527 }
528
529 #[test]
530 fn test_projection_mapper_with_projection() {
531 let metadata = Arc::new(
532 TestRegionMetadataBuilder::default()
533 .num_tags(2)
534 .num_fields(2)
535 .build(),
536 );
537 let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), false).unwrap();
539 assert_eq!([4, 1], mapper.column_ids());
540 assert_eq!(
541 [(4, ConcreteDataType::int64_datatype())],
542 mapper.as_primary_key().unwrap().batch_fields()
543 );
544
545 let batch = new_batch(0, &[1, 2], &[(4, 4)], 3);
546 let cache = CacheManager::builder().vector_cache_size(1024).build();
547 let cache = CacheStrategy::EnableAll(Arc::new(cache));
548 let record_batch = mapper
549 .as_primary_key()
550 .unwrap()
551 .convert(&batch, &cache)
552 .unwrap();
553 let expect = "\
554+----+----+
555| v1 | k0 |
556+----+----+
557| 4 | 1 |
558| 4 | 1 |
559| 4 | 1 |
560+----+----+";
561 assert_eq!(expect, print_record_batch(record_batch));
562 }
563
564 #[test]
565 fn test_projection_mapper_empty_projection() {
566 let metadata = Arc::new(
567 TestRegionMetadataBuilder::default()
568 .num_tags(2)
569 .num_fields(2)
570 .build(),
571 );
572 let mapper = ProjectionMapper::new(&metadata, [].into_iter(), false).unwrap();
574 assert_eq!([0], mapper.column_ids()); assert!(mapper.output_schema().is_empty());
576 let pk_mapper = mapper.as_primary_key().unwrap();
577 assert!(pk_mapper.batch_fields().is_empty());
578 assert!(!pk_mapper.has_tags);
579 assert!(pk_mapper.batch_indices.is_empty());
580 assert!(pk_mapper.is_empty_projection);
581
582 let batch = new_batch(0, &[1, 2], &[], 3);
583 let cache = CacheManager::builder().vector_cache_size(1024).build();
584 let cache = CacheStrategy::EnableAll(Arc::new(cache));
585 let record_batch = pk_mapper.convert(&batch, &cache).unwrap();
586 assert_eq!(3, record_batch.num_rows());
587 assert_eq!(0, record_batch.num_columns());
588 assert!(record_batch.schema.is_empty());
589 }
590
591 fn new_flat_batch(
592 ts_start: Option<i64>,
593 idx_tags: &[(usize, i64)],
594 idx_fields: &[(usize, i64)],
595 num_rows: usize,
596 ) -> datatypes::arrow::record_batch::RecordBatch {
597 let mut columns = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
598 let mut fields = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
599
600 for (i, tag) in idx_tags {
604 let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
605 *tag, num_rows,
606 ))) as _;
607 columns.push(array);
608 fields.push(Field::new(
609 format!("k{i}"),
610 datatypes::arrow::datatypes::DataType::Int64,
611 true,
612 ));
613 }
614
615 for (i, field) in idx_fields {
617 let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
618 *field, num_rows,
619 ))) as _;
620 columns.push(array);
621 fields.push(Field::new(
622 format!("v{i}"),
623 datatypes::arrow::datatypes::DataType::Int64,
624 true,
625 ));
626 }
627
628 if let Some(ts_start) = ts_start {
630 let timestamps = Arc::new(TimestampMillisecondArray::from_iter_values(
631 (0..num_rows).map(|i| ts_start + i as i64 * 1000),
632 )) as _;
633 columns.push(timestamps);
634 fields.push(Field::new(
635 "ts",
636 datatypes::arrow::datatypes::DataType::Timestamp(
637 datatypes::arrow::datatypes::TimeUnit::Millisecond,
638 None,
639 ),
640 true,
641 ));
642 }
643
644 let converter = DensePrimaryKeyCodec::with_fields(
647 (0..idx_tags.len())
648 .map(|idx| {
649 (
650 idx as u32,
651 SortField::new(ConcreteDataType::int64_datatype()),
652 )
653 })
654 .collect(),
655 );
656 let encoded_pk = converter
657 .encode(idx_tags.iter().map(|(_, v)| ValueRef::Int64(*v)))
658 .unwrap();
659
660 let pk_values: Vec<&[u8]> = std::iter::repeat_n(encoded_pk.as_slice(), num_rows).collect();
662 let keys = datatypes::arrow::array::UInt32Array::from_iter(0..num_rows as u32);
663 let values = Arc::new(datatypes::arrow::array::BinaryArray::from_vec(pk_values));
664 let pk_array =
665 Arc::new(datatypes::arrow::array::DictionaryArray::try_new(keys, values).unwrap()) as _;
666 columns.push(pk_array);
667 fields.push(Field::new_dictionary(
668 PRIMARY_KEY_COLUMN_NAME,
669 datatypes::arrow::datatypes::DataType::UInt32,
670 datatypes::arrow::datatypes::DataType::Binary,
671 false,
672 ));
673
674 columns.push(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)) as _);
676 fields.push(Field::new(
677 SEQUENCE_COLUMN_NAME,
678 datatypes::arrow::datatypes::DataType::UInt64,
679 false,
680 ));
681
682 columns.push(Arc::new(UInt8Array::from_iter_values(
684 (0..num_rows).map(|_| OpType::Put as u8),
685 )) as _);
686 fields.push(Field::new(
687 OP_TYPE_COLUMN_NAME,
688 datatypes::arrow::datatypes::DataType::UInt8,
689 false,
690 ));
691
692 let schema = Arc::new(datatypes::arrow::datatypes::Schema::new(fields));
693
694 datatypes::arrow::record_batch::RecordBatch::try_new(schema, columns).unwrap()
695 }
696
697 #[test]
698 fn test_flat_projection_mapper_all() {
699 let metadata = Arc::new(
700 TestRegionMetadataBuilder::default()
701 .num_tags(2)
702 .num_fields(2)
703 .build(),
704 );
705 let mapper = ProjectionMapper::all(&metadata, true).unwrap();
706 assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
707 assert_eq!(
708 [
709 (1, ConcreteDataType::int64_datatype()),
710 (2, ConcreteDataType::int64_datatype()),
711 (3, ConcreteDataType::int64_datatype()),
712 (4, ConcreteDataType::int64_datatype()),
713 (0, ConcreteDataType::timestamp_millisecond_datatype())
714 ],
715 mapper.as_flat().unwrap().batch_schema()
716 );
717
718 let batch = new_flat_batch(Some(0), &[(1, 1), (2, 2)], &[(3, 3), (4, 4)], 3);
719 let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
720 let expect = "\
721+---------------------+----+----+----+----+
722| ts | k0 | k1 | v0 | v1 |
723+---------------------+----+----+----+----+
724| 1970-01-01T00:00:00 | 1 | 2 | 3 | 4 |
725| 1970-01-01T00:00:01 | 1 | 2 | 3 | 4 |
726| 1970-01-01T00:00:02 | 1 | 2 | 3 | 4 |
727+---------------------+----+----+----+----+";
728 assert_eq!(expect, print_record_batch(record_batch));
729 }
730
731 #[test]
732 fn test_flat_projection_mapper_with_projection() {
733 let metadata = Arc::new(
734 TestRegionMetadataBuilder::default()
735 .num_tags(2)
736 .num_fields(2)
737 .build(),
738 );
739 let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), true).unwrap();
741 assert_eq!([4, 1], mapper.column_ids());
742 assert_eq!(
743 [
744 (1, ConcreteDataType::int64_datatype()),
745 (4, ConcreteDataType::int64_datatype())
746 ],
747 mapper.as_flat().unwrap().batch_schema()
748 );
749
750 let batch = new_flat_batch(None, &[(1, 1)], &[(4, 4)], 3);
751 let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
752 let expect = "\
753+----+----+
754| v1 | k0 |
755+----+----+
756| 4 | 1 |
757| 4 | 1 |
758| 4 | 1 |
759+----+----+";
760 assert_eq!(expect, print_record_batch(record_batch));
761 }
762
763 #[test]
764 fn test_flat_projection_mapper_empty_projection() {
765 let metadata = Arc::new(
766 TestRegionMetadataBuilder::default()
767 .num_tags(2)
768 .num_fields(2)
769 .build(),
770 );
771 let mapper = ProjectionMapper::new(&metadata, [].into_iter(), true).unwrap();
773 assert_eq!([0], mapper.column_ids()); assert!(mapper.output_schema().is_empty());
775 let flat_mapper = mapper.as_flat().unwrap();
776 assert!(flat_mapper.batch_schema().is_empty());
777
778 let batch = new_flat_batch(Some(0), &[], &[], 3);
779 let record_batch = flat_mapper.convert(&batch).unwrap();
780 assert_eq!(3, record_batch.num_rows());
781 assert_eq!(0, record_batch.num_columns());
782 assert!(record_batch.schema.is_empty());
783 }
784}