mito2/read/
batch_adapter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Adapter to convert [`BoxedBatchIterator`] (primary key format) into an iterator
16//! of flat-format Arrow [`RecordBatch`]es, allowing memtable iterators that only
17//! produce [`Batch`] to feed into the flat read pipeline.
18
19use std::borrow::Cow;
20use std::collections::HashSet;
21use std::sync::Arc;
22
23use api::v1::SemanticType;
24use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt32Array};
25use datatypes::arrow::datatypes::{Field, SchemaRef};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::prelude::{ConcreteDataType, DataType, Vector};
28use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
29use snafu::ResultExt;
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::ColumnId;
32
33use crate::error::{
34    DataTypeMismatchSnafu, DecodeSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu, Result,
35};
36use crate::memtable::BoxedBatchIterator;
37use crate::read::Batch;
38use crate::sst::{internal_fields, tag_maybe_to_dictionary_field};
39
40/// Adapts a [`BoxedBatchIterator`] into an `Iterator<Item = Result<RecordBatch>>`
41/// producing flat-format record batches.
42pub struct BatchToRecordBatchAdapter {
43    iter: BoxedBatchIterator,
44    codec: Arc<dyn PrimaryKeyCodec>,
45    output_schema: SchemaRef,
46    projected_pk: Vec<ProjectedPkColumn>,
47}
48
49struct ProjectedPkColumn {
50    column_id: ColumnId,
51    pk_index: usize,
52    data_type: ConcreteDataType,
53}
54
55impl BatchToRecordBatchAdapter {
56    /// Creates a new adapter.
57    ///
58    /// - `iter`: the source batch iterator producing primary-key-format batches.
59    /// - `metadata`: region metadata describing the schema.
60    /// - `codec`: codec for decoding the encoded primary key bytes.
61    /// - `read_column_ids`: projected column ids to read.
62    pub fn new(
63        iter: BoxedBatchIterator,
64        metadata: RegionMetadataRef,
65        codec: Arc<dyn PrimaryKeyCodec>,
66        read_column_ids: &[ColumnId],
67    ) -> Self {
68        let read_column_id_set: HashSet<_> = read_column_ids.iter().copied().collect();
69        let projected_pk = metadata
70            .primary_key_columns()
71            .enumerate()
72            .filter(|(_, column_metadata)| read_column_id_set.contains(&column_metadata.column_id))
73            .map(|(pk_index, column_metadata)| ProjectedPkColumn {
74                column_id: column_metadata.column_id,
75                pk_index,
76                data_type: column_metadata.column_schema.data_type.clone(),
77            })
78            .collect();
79        let output_schema = compute_output_arrow_schema(&metadata, &read_column_id_set);
80
81        Self {
82            iter,
83            codec,
84            output_schema,
85            projected_pk,
86        }
87    }
88
89    /// Converts a single [`Batch`] into a flat-format [`RecordBatch`].
90    fn convert_batch(&self, batch: &Batch) -> Result<RecordBatch> {
91        let num_rows = batch.num_rows();
92
93        let pk_values = if let Some(vals) = batch.pk_values() {
94            Cow::Borrowed(vals)
95        } else {
96            Cow::Owned(
97                self.codec
98                    .decode(batch.primary_key())
99                    .context(DecodeSnafu)?,
100            )
101        };
102
103        let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.output_schema.fields().len());
104        for pk_column in &self.projected_pk {
105            if pk_column.data_type.is_string() {
106                let value = get_pk_value(&pk_values, pk_column.column_id, pk_column.pk_index);
107                columns.push(build_string_tag_dict_array(
108                    value,
109                    &pk_column.data_type,
110                    num_rows,
111                ));
112            } else {
113                let value = get_pk_value(&pk_values, pk_column.column_id, pk_column.pk_index);
114                let array = build_repeated_value_array(value, &pk_column.data_type, num_rows)?;
115                columns.push(array);
116            }
117        }
118        for batch_col in batch.fields() {
119            columns.push(batch_col.data.to_arrow_array());
120        }
121
122        columns.push(batch.timestamps().to_arrow_array());
123
124        // __primary_key
125        let pk_bytes = batch.primary_key();
126        let values = Arc::new(BinaryArray::from_iter_values([pk_bytes]));
127        let keys = UInt32Array::from(vec![0u32; num_rows]);
128        let pk_dict: ArrayRef = Arc::new(DictionaryArray::new(keys, values));
129        columns.push(pk_dict);
130
131        // __sequence.
132        columns.push(batch.sequences().to_arrow_array());
133
134        // __op_type.
135        columns.push(batch.op_types().to_arrow_array());
136
137        RecordBatch::try_new(self.output_schema.clone(), columns).context(NewRecordBatchSnafu)
138    }
139}
140
141impl Iterator for BatchToRecordBatchAdapter {
142    type Item = Result<RecordBatch>;
143
144    fn next(&mut self) -> Option<Self::Item> {
145        loop {
146            match self.iter.next()? {
147                Ok(batch) => {
148                    if batch.is_empty() {
149                        continue;
150                    }
151                    return Some(self.convert_batch(&batch));
152                }
153                Err(e) => return Some(Err(e)),
154            }
155        }
156    }
157}
158
159/// Extracts a value for the given primary key column from decoded [`CompositeValues`].
160fn get_pk_value(
161    pk_values: &CompositeValues,
162    column_id: ColumnId,
163    pk_index: usize,
164) -> &datatypes::value::Value {
165    match pk_values {
166        CompositeValues::Dense(dense) => {
167            if pk_index < dense.len() {
168                &dense[pk_index].1
169            } else {
170                &datatypes::value::Value::Null
171            }
172        }
173        CompositeValues::Sparse(sparse) => sparse.get_or_null(column_id),
174    }
175}
176
177/// Builds an Arrow array of `num_rows` copies of `value`.
178fn build_repeated_value_array(
179    value: &datatypes::value::Value,
180    data_type: &ConcreteDataType,
181    num_rows: usize,
182) -> Result<ArrayRef> {
183    let scalar = value
184        .try_to_scalar_value(data_type)
185        .context(DataTypeMismatchSnafu)?;
186    scalar
187        .to_array_of_size(num_rows)
188        .context(EvalPartitionFilterSnafu)
189}
190
191/// Builds a dictionary-encoded string tag array with one dictionary value.
192fn build_string_tag_dict_array(
193    value: &datatypes::value::Value,
194    data_type: &ConcreteDataType,
195    num_rows: usize,
196) -> ArrayRef {
197    let mut builder = data_type.create_mutable_vector(1);
198    builder.push_value_ref(&value.as_value_ref());
199    let values = builder.to_vector().to_arrow_array();
200
201    let keys = UInt32Array::from(vec![0u32; num_rows]);
202    Arc::new(DictionaryArray::new(keys, values))
203}
204
205fn compute_output_arrow_schema(
206    metadata: &RegionMetadataRef,
207    read_column_id_set: &HashSet<ColumnId>,
208) -> SchemaRef {
209    let mut fields = Vec::new();
210
211    for column_metadata in metadata.primary_key_columns() {
212        if !read_column_id_set.contains(&column_metadata.column_id) {
213            continue;
214        }
215        let field = Arc::new(Field::new(
216            &column_metadata.column_schema.name,
217            column_metadata.column_schema.data_type.as_arrow_type(),
218            column_metadata.column_schema.is_nullable(),
219        ));
220        let field = if column_metadata.semantic_type == SemanticType::Tag {
221            tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, &field)
222        } else {
223            field
224        };
225        fields.push(field);
226    }
227
228    for column_metadata in metadata.field_columns() {
229        if !read_column_id_set.contains(&column_metadata.column_id) {
230            continue;
231        }
232        let field = Arc::new(Field::new(
233            &column_metadata.column_schema.name,
234            column_metadata.column_schema.data_type.as_arrow_type(),
235            column_metadata.column_schema.is_nullable(),
236        ));
237        fields.push(field);
238    }
239
240    let time_index = metadata.time_index_column();
241    let time_index_field = Arc::new(Field::new(
242        &time_index.column_schema.name,
243        time_index.column_schema.data_type.as_arrow_type(),
244        time_index.column_schema.is_nullable(),
245    ));
246    fields.push(time_index_field);
247    fields.extend(internal_fields().iter().cloned());
248
249    Arc::new(datatypes::arrow::datatypes::Schema::new(fields))
250}
251
252#[cfg(test)]
253mod tests {
254    use std::sync::Arc;
255
256    use api::v1::{OpType, SemanticType};
257    use datatypes::arrow::array::{Array, TimestampMillisecondArray, UInt8Array, UInt64Array};
258    use datatypes::arrow::datatypes::UInt32Type;
259    use datatypes::prelude::ConcreteDataType;
260    use datatypes::schema::ColumnSchema;
261    use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
262    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
263    use store_api::storage::RegionId;
264
265    use super::*;
266    use crate::read::flat_projection::FlatProjectionMapper;
267    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
268    use crate::test_util::new_batch_builder;
269    use crate::test_util::sst_util::{new_primary_key, sst_region_metadata};
270
271    /// Helper to build the adapter from batches and metadata.
272    fn build_adapter(
273        batches: Vec<Batch>,
274        metadata: &RegionMetadataRef,
275        codec: &Arc<dyn PrimaryKeyCodec>,
276    ) -> BatchToRecordBatchAdapter {
277        let read_column_ids = metadata
278            .column_metadatas
279            .iter()
280            .map(|column| column.column_id)
281            .collect::<Vec<_>>();
282        let iter: BoxedBatchIterator = Box::new(batches.into_iter().map(Ok));
283        BatchToRecordBatchAdapter::new(
284            iter,
285            Arc::clone(metadata),
286            Arc::clone(codec),
287            &read_column_ids,
288        )
289    }
290
291    #[test]
292    fn test_single_batch_two_tags() {
293        // Schema: tag_0(string), tag_1(string), field_0(u64), ts
294        let metadata = Arc::new(sst_region_metadata());
295        let codec = build_primary_key_codec(&metadata);
296
297        let pk = new_primary_key(&["host-1", "region-a"]);
298        let batch = new_batch_builder(
299            &pk,
300            &[1, 2, 3],
301            &[100, 100, 100],
302            &[OpType::Put, OpType::Put, OpType::Put],
303            2,
304            &[10, 20, 30],
305        )
306        .build()
307        .unwrap();
308
309        let adapter = build_adapter(vec![batch], &metadata, &codec);
310        let results: Vec<_> = adapter.collect::<Vec<_>>();
311        assert_eq!(1, results.len());
312
313        let rb = results[0].as_ref().unwrap();
314        let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
315        assert_eq!(rb.schema(), expected_schema);
316        assert_eq!(3, rb.num_rows());
317        // 2 tags + 1 field + 1 time index + 3 internal = 7 columns
318        assert_eq!(7, rb.num_columns());
319    }
320
321    #[test]
322    fn test_multiple_batches() {
323        let metadata = Arc::new(sst_region_metadata());
324        let codec = build_primary_key_codec(&metadata);
325
326        let pk1 = new_primary_key(&["a", "b"]);
327        let batch1 = new_batch_builder(
328            &pk1,
329            &[1, 2],
330            &[100, 100],
331            &[OpType::Put, OpType::Put],
332            2,
333            &[10, 20],
334        )
335        .build()
336        .unwrap();
337
338        let pk2 = new_primary_key(&["c", "d"]);
339        let batch2 = new_batch_builder(
340            &pk2,
341            &[3, 4],
342            &[200, 200],
343            &[OpType::Put, OpType::Put],
344            2,
345            &[30, 40],
346        )
347        .build()
348        .unwrap();
349
350        let adapter = build_adapter(vec![batch1, batch2], &metadata, &codec);
351        let results: Vec<_> = adapter.map(|r| r.unwrap()).collect();
352        assert_eq!(2, results.len());
353
354        assert_eq!(2, results[0].num_rows());
355        assert_eq!(2, results[1].num_rows());
356    }
357
358    #[test]
359    fn test_empty_batch_skipped() {
360        let metadata = Arc::new(sst_region_metadata());
361        let codec = build_primary_key_codec(&metadata);
362
363        let empty = Batch::empty();
364        let pk = new_primary_key(&["x", "y"]);
365        let batch = new_batch_builder(&pk, &[1], &[1], &[OpType::Put], 2, &[42])
366            .build()
367            .unwrap();
368
369        let adapter = build_adapter(vec![empty, batch], &metadata, &codec);
370        let results: Vec<_> = adapter.map(|r| r.unwrap()).collect();
371        assert_eq!(1, results.len());
372        assert_eq!(1, results[0].num_rows());
373    }
374
375    #[test]
376    fn test_no_tags() {
377        // Schema with no primary key columns: field_0(u64), ts
378        let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0));
379        builder
380            .push_column_metadata(ColumnMetadata {
381                column_schema: ColumnSchema::new(
382                    "field_0".to_string(),
383                    ConcreteDataType::uint64_datatype(),
384                    true,
385                ),
386                semantic_type: SemanticType::Field,
387                column_id: 0,
388            })
389            .push_column_metadata(ColumnMetadata {
390                column_schema: ColumnSchema::new(
391                    "ts".to_string(),
392                    ConcreteDataType::timestamp_millisecond_datatype(),
393                    false,
394                ),
395                semantic_type: SemanticType::Timestamp,
396                column_id: 1,
397            });
398        builder.primary_key(vec![]);
399        let metadata = Arc::new(builder.build().unwrap());
400        let codec = build_primary_key_codec(&metadata);
401
402        // Empty primary key
403        let pk = vec![];
404        let batch = new_batch_builder(
405            &pk,
406            &[1, 2],
407            &[100, 100],
408            &[OpType::Put, OpType::Put],
409            0,
410            &[10, 20],
411        )
412        .build()
413        .unwrap();
414
415        let adapter = build_adapter(vec![batch], &metadata, &codec);
416        let results: Vec<_> = adapter.map(|r| r.unwrap()).collect();
417        assert_eq!(1, results.len());
418
419        let rb = &results[0];
420        let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
421        assert_eq!(rb.schema(), expected_schema);
422        // 0 tags + 1 field + 1 time index + 3 internal = 5 columns
423        assert_eq!(5, rb.num_columns());
424        assert_eq!(2, rb.num_rows());
425    }
426
427    #[test]
428    fn test_primary_key_dict_column() {
429        // Verify the __primary_key column is a proper dictionary array.
430        let metadata = Arc::new(sst_region_metadata());
431        let codec = build_primary_key_codec(&metadata);
432
433        let pk = new_primary_key(&["host", "az"]);
434        let batch = new_batch_builder(
435            &pk,
436            &[1, 2],
437            &[1, 1],
438            &[OpType::Put, OpType::Put],
439            2,
440            &[5, 6],
441        )
442        .build()
443        .unwrap();
444
445        let adapter = build_adapter(vec![batch.clone()], &metadata, &codec);
446        let rb = adapter.into_iter().next().unwrap().unwrap();
447
448        // __primary_key is at num_columns - 3
449        let pk_col_idx = rb.num_columns() - 3;
450        let pk_array = rb
451            .column(pk_col_idx)
452            .as_any()
453            .downcast_ref::<DictionaryArray<UInt32Type>>()
454            .expect("should be DictionaryArray<UInt32>");
455
456        // Should have 2 rows, all pointing to key 0
457        assert_eq!(2, pk_array.len());
458        assert_eq!(0, pk_array.keys().value(0));
459        assert_eq!(0, pk_array.keys().value(1));
460
461        // The single dictionary value should be the encoded pk bytes.
462        let values = pk_array
463            .values()
464            .as_any()
465            .downcast_ref::<BinaryArray>()
466            .unwrap();
467        assert_eq!(1, values.len());
468        assert_eq!(batch.primary_key(), values.value(0));
469    }
470
471    #[test]
472    fn test_sequence_and_op_type_columns() {
473        let metadata = Arc::new(sst_region_metadata());
474        let codec = build_primary_key_codec(&metadata);
475
476        let pk = new_primary_key(&["a", "b"]);
477        let batch = new_batch_builder(
478            &pk,
479            &[10, 20, 30],
480            &[1, 2, 3],
481            &[OpType::Put, OpType::Delete, OpType::Put],
482            2,
483            &[100, 200, 300],
484        )
485        .build()
486        .unwrap();
487
488        let adapter = build_adapter(vec![batch], &metadata, &codec);
489        let rb = adapter.into_iter().next().unwrap().unwrap();
490
491        // __sequence is at num_columns - 2
492        let seq_idx = rb.num_columns() - 2;
493        let seq_array = rb
494            .column(seq_idx)
495            .as_any()
496            .downcast_ref::<UInt64Array>()
497            .unwrap();
498        assert_eq!(&[1u64, 2, 3], seq_array.values().as_ref());
499
500        // __op_type is at num_columns - 1
501        let op_idx = rb.num_columns() - 1;
502        let op_array = rb
503            .column(op_idx)
504            .as_any()
505            .downcast_ref::<UInt8Array>()
506            .unwrap();
507        assert_eq!(
508            &[OpType::Put as u8, OpType::Delete as u8, OpType::Put as u8],
509            op_array.values().as_ref()
510        );
511    }
512
513    #[test]
514    fn test_integer_tag_column() {
515        // Schema with an integer (non-string) tag: tag_0(u32), field_0(u64), ts
516        let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0));
517        builder
518            .push_column_metadata(ColumnMetadata {
519                column_schema: ColumnSchema::new(
520                    "tag_0".to_string(),
521                    ConcreteDataType::uint32_datatype(),
522                    false,
523                ),
524                semantic_type: SemanticType::Tag,
525                column_id: 0,
526            })
527            .push_column_metadata(ColumnMetadata {
528                column_schema: ColumnSchema::new(
529                    "field_0".to_string(),
530                    ConcreteDataType::uint64_datatype(),
531                    true,
532                ),
533                semantic_type: SemanticType::Field,
534                column_id: 1,
535            })
536            .push_column_metadata(ColumnMetadata {
537                column_schema: ColumnSchema::new(
538                    "ts".to_string(),
539                    ConcreteDataType::timestamp_millisecond_datatype(),
540                    false,
541                ),
542                semantic_type: SemanticType::Timestamp,
543                column_id: 2,
544            });
545        builder.primary_key(vec![0]);
546        let metadata = Arc::new(builder.build().unwrap());
547        let codec = build_primary_key_codec(&metadata);
548
549        // Encode integer primary key
550        let pk = {
551            use datatypes::value::ValueRef;
552            use mito_codec::row_converter::PrimaryKeyCodecExt;
553            let codec_ext = mito_codec::row_converter::DensePrimaryKeyCodec::with_fields(vec![(
554                0,
555                mito_codec::row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
556            )]);
557            codec_ext
558                .encode([ValueRef::UInt32(42)].into_iter())
559                .unwrap()
560        };
561        let batch = new_batch_builder(
562            &pk,
563            &[1, 2],
564            &[1, 1],
565            &[OpType::Put, OpType::Put],
566            1,
567            &[10, 20],
568        )
569        .build()
570        .unwrap();
571
572        let adapter = build_adapter(vec![batch], &metadata, &codec);
573        let rb = adapter.into_iter().next().unwrap().unwrap();
574
575        let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
576        assert_eq!(rb.schema(), expected_schema);
577
578        // tag_0 column (index 0) should be a regular (non-dictionary) UInt32 array
579        let tag_array = rb
580            .column(0)
581            .as_any()
582            .downcast_ref::<UInt32Array>()
583            .expect("integer tag should be a plain UInt32Array");
584        assert_eq!(&[42u32, 42], tag_array.values().as_ref());
585    }
586
587    #[test]
588    fn test_with_precomputed_pk_values() {
589        // If pk_values are already set on the Batch, the adapter should use them
590        // instead of calling codec.decode().
591        let metadata = Arc::new(sst_region_metadata());
592        let codec = build_primary_key_codec(&metadata);
593
594        let pk = new_primary_key(&["pre", "computed"]);
595        let mut batch = new_batch_builder(&pk, &[1], &[1], &[OpType::Put], 2, &[99])
596            .build()
597            .unwrap();
598
599        // Decode and set pk_values ahead of time.
600        let decoded = codec.decode(&pk).unwrap();
601        batch.set_pk_values(decoded);
602
603        let adapter = build_adapter(vec![batch], &metadata, &codec);
604        let rb = adapter.into_iter().next().unwrap().unwrap();
605        assert_eq!(1, rb.num_rows());
606
607        let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
608        assert_eq!(rb.schema(), expected_schema);
609    }
610
611    #[test]
612    fn test_partial_projection_schema_matches_mapper() {
613        let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0));
614        builder
615            .push_column_metadata(ColumnMetadata {
616                column_schema: ColumnSchema::new(
617                    "tag_0".to_string(),
618                    ConcreteDataType::string_datatype(),
619                    true,
620                ),
621                semantic_type: SemanticType::Tag,
622                column_id: 0,
623            })
624            .push_column_metadata(ColumnMetadata {
625                column_schema: ColumnSchema::new(
626                    "tag_1".to_string(),
627                    ConcreteDataType::string_datatype(),
628                    true,
629                ),
630                semantic_type: SemanticType::Tag,
631                column_id: 1,
632            })
633            .push_column_metadata(ColumnMetadata {
634                column_schema: ColumnSchema::new(
635                    "field_0".to_string(),
636                    ConcreteDataType::uint64_datatype(),
637                    true,
638                ),
639                semantic_type: SemanticType::Field,
640                column_id: 2,
641            })
642            .push_column_metadata(ColumnMetadata {
643                column_schema: ColumnSchema::new(
644                    "field_1".to_string(),
645                    ConcreteDataType::uint64_datatype(),
646                    true,
647                ),
648                semantic_type: SemanticType::Field,
649                column_id: 3,
650            })
651            .push_column_metadata(ColumnMetadata {
652                column_schema: ColumnSchema::new(
653                    "ts".to_string(),
654                    ConcreteDataType::timestamp_millisecond_datatype(),
655                    false,
656                ),
657                semantic_type: SemanticType::Timestamp,
658                column_id: 4,
659            });
660        builder.primary_key(vec![0, 1]);
661        let metadata = Arc::new(builder.build().unwrap());
662        let codec = build_primary_key_codec(&metadata);
663
664        // Project tag_0 and field_1; skip tag_1 and field_0.
665        let read_column_ids = vec![0, 3];
666
667        let pk = new_primary_key(&["host-1", "region-a"]);
668        let batch = new_batch_builder(
669            &pk,
670            &[1, 2, 3],
671            &[100, 100, 100],
672            &[OpType::Put, OpType::Put, OpType::Put],
673            3,
674            &[10, 20, 30],
675        )
676        .build()
677        .unwrap();
678
679        let iter: BoxedBatchIterator = Box::new(vec![Ok(batch)].into_iter());
680        let adapter =
681            BatchToRecordBatchAdapter::new(iter, metadata.clone(), codec, &read_column_ids);
682        let rb = adapter.into_iter().next().unwrap().unwrap();
683
684        let mapper = FlatProjectionMapper::new(&metadata, [0, 3].into_iter()).unwrap();
685        assert_eq!(rb.schema(), mapper.input_arrow_schema(false));
686        // tag_0 + field_1 + ts + 3 internal columns.
687        assert_eq!(6, rb.num_columns());
688        assert_eq!(3, rb.num_rows());
689
690        let field_1 = rb.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
691        assert_eq!(&[10u64, 20, 30], field_1.values().as_ref());
692
693        let ts = rb
694            .column(2)
695            .as_any()
696            .downcast_ref::<TimestampMillisecondArray>()
697            .unwrap();
698        assert_eq!(&[1i64, 2, 3], ts.values().as_ref());
699    }
700}