Skip to main content

mito2/read/
batch_adapter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Adapter to convert [`BoxedBatchIterator`] (primary key format) into an iterator
16//! of flat-format Arrow [`RecordBatch`]es, allowing memtable iterators that only
17//! produce [`Batch`] to feed into the flat read pipeline.
18
19use std::borrow::Cow;
20use std::collections::HashSet;
21use std::sync::Arc;
22
23use api::v1::SemanticType;
24use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt32Array};
25use datatypes::arrow::datatypes::{Field, SchemaRef};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::prelude::{ConcreteDataType, DataType, Vector};
28use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
29use snafu::ResultExt;
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::ColumnId;
32
33use crate::error::{
34    DataTypeMismatchSnafu, DecodeSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu, Result,
35};
36use crate::memtable::BoxedBatchIterator;
37use crate::read::Batch;
38use crate::sst::{internal_fields, tag_maybe_to_dictionary_field, with_field_id};
39
40/// Adapts a [`BoxedBatchIterator`] into an `Iterator<Item = Result<RecordBatch>>`
41/// producing flat-format record batches.
42pub struct BatchToRecordBatchAdapter {
43    iter: BoxedBatchIterator,
44    codec: Arc<dyn PrimaryKeyCodec>,
45    output_schema: SchemaRef,
46    projected_pk: Vec<ProjectedPkColumn>,
47}
48
49struct ProjectedPkColumn {
50    column_id: ColumnId,
51    pk_index: usize,
52    data_type: ConcreteDataType,
53}
54
55impl BatchToRecordBatchAdapter {
56    /// Creates a new adapter.
57    ///
58    /// - `iter`: the source batch iterator producing primary-key-format batches.
59    /// - `metadata`: region metadata describing the schema.
60    /// - `codec`: codec for decoding the encoded primary key bytes.
61    /// - `read_column_ids`: projected column ids to read.
62    pub fn new(
63        iter: BoxedBatchIterator,
64        metadata: RegionMetadataRef,
65        codec: Arc<dyn PrimaryKeyCodec>,
66        read_column_ids: &[ColumnId],
67    ) -> Self {
68        let read_column_id_set: HashSet<_> = read_column_ids.iter().copied().collect();
69        let projected_pk = metadata
70            .primary_key_columns()
71            .enumerate()
72            .filter(|(_, column_metadata)| read_column_id_set.contains(&column_metadata.column_id))
73            .map(|(pk_index, column_metadata)| ProjectedPkColumn {
74                column_id: column_metadata.column_id,
75                pk_index,
76                data_type: column_metadata.column_schema.data_type.clone(),
77            })
78            .collect();
79        let output_schema = compute_output_arrow_schema(&metadata, &read_column_id_set);
80
81        Self {
82            iter,
83            codec,
84            output_schema,
85            projected_pk,
86        }
87    }
88
89    /// Converts a single [`Batch`] into a flat-format [`RecordBatch`].
90    fn convert_batch(&self, batch: &Batch) -> Result<RecordBatch> {
91        let num_rows = batch.num_rows();
92
93        let pk_values = if let Some(vals) = batch.pk_values() {
94            Cow::Borrowed(vals)
95        } else {
96            Cow::Owned(
97                self.codec
98                    .decode(batch.primary_key())
99                    .context(DecodeSnafu)?,
100            )
101        };
102
103        let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.output_schema.fields().len());
104        for pk_column in &self.projected_pk {
105            if pk_column.data_type.is_string() {
106                let value = get_pk_value(&pk_values, pk_column.column_id, pk_column.pk_index);
107                columns.push(build_string_tag_dict_array(
108                    value,
109                    &pk_column.data_type,
110                    num_rows,
111                ));
112            } else {
113                let value = get_pk_value(&pk_values, pk_column.column_id, pk_column.pk_index);
114                let array = build_repeated_value_array(value, &pk_column.data_type, num_rows)?;
115                columns.push(array);
116            }
117        }
118        for batch_col in batch.fields() {
119            columns.push(batch_col.data.to_arrow_array());
120        }
121
122        columns.push(batch.timestamps().to_arrow_array());
123
124        // __primary_key
125        let pk_bytes = batch.primary_key();
126        let values = Arc::new(BinaryArray::from_iter_values([pk_bytes]));
127        let keys = UInt32Array::from(vec![0u32; num_rows]);
128        let pk_dict: ArrayRef = Arc::new(DictionaryArray::new(keys, values));
129        columns.push(pk_dict);
130
131        // __sequence.
132        columns.push(batch.sequences().to_arrow_array());
133
134        // __op_type.
135        columns.push(batch.op_types().to_arrow_array());
136
137        RecordBatch::try_new(self.output_schema.clone(), columns).context(NewRecordBatchSnafu)
138    }
139}
140
141impl Iterator for BatchToRecordBatchAdapter {
142    type Item = Result<RecordBatch>;
143
144    fn next(&mut self) -> Option<Self::Item> {
145        loop {
146            match self.iter.next()? {
147                Ok(batch) => {
148                    if batch.is_empty() {
149                        continue;
150                    }
151                    return Some(self.convert_batch(&batch));
152                }
153                Err(e) => return Some(Err(e)),
154            }
155        }
156    }
157}
158
159/// Extracts a value for the given primary key column from decoded [`CompositeValues`].
160fn get_pk_value(
161    pk_values: &CompositeValues,
162    column_id: ColumnId,
163    pk_index: usize,
164) -> &datatypes::value::Value {
165    match pk_values {
166        CompositeValues::Dense(dense) => {
167            if pk_index < dense.len() {
168                &dense[pk_index].1
169            } else {
170                &datatypes::value::Value::Null
171            }
172        }
173        CompositeValues::Sparse(sparse) => sparse.get_or_null(column_id),
174    }
175}
176
177/// Builds an Arrow array of `num_rows` copies of `value`.
178fn build_repeated_value_array(
179    value: &datatypes::value::Value,
180    data_type: &ConcreteDataType,
181    num_rows: usize,
182) -> Result<ArrayRef> {
183    let scalar = value
184        .try_to_scalar_value(data_type)
185        .context(DataTypeMismatchSnafu)?;
186    scalar
187        .to_array_of_size(num_rows)
188        .context(EvalPartitionFilterSnafu)
189}
190
191/// Builds a dictionary-encoded string tag array with one dictionary value.
192fn build_string_tag_dict_array(
193    value: &datatypes::value::Value,
194    data_type: &ConcreteDataType,
195    num_rows: usize,
196) -> ArrayRef {
197    let mut builder = data_type.create_mutable_vector(1);
198    builder.push_value_ref(&value.as_value_ref());
199    let values = builder.to_vector().to_arrow_array();
200
201    let keys = UInt32Array::from(vec![0u32; num_rows]);
202    Arc::new(DictionaryArray::new(keys, values))
203}
204
205fn compute_output_arrow_schema(
206    metadata: &RegionMetadataRef,
207    read_column_id_set: &HashSet<ColumnId>,
208) -> SchemaRef {
209    let mut fields = Vec::new();
210
211    for column_metadata in metadata.primary_key_columns() {
212        if !read_column_id_set.contains(&column_metadata.column_id) {
213            continue;
214        }
215        let field = Field::new(
216            &column_metadata.column_schema.name,
217            column_metadata.column_schema.data_type.as_arrow_type(),
218            column_metadata.column_schema.is_nullable(),
219        );
220        let field = with_field_id(field, column_metadata.column_id);
221
222        if column_metadata.semantic_type == SemanticType::Tag {
223            fields.push(tag_maybe_to_dictionary_field(
224                &column_metadata.column_schema.data_type,
225                &Arc::new(field),
226            ));
227        } else {
228            fields.push(Arc::new(field));
229        }
230    }
231
232    for column_metadata in metadata.field_columns() {
233        if !read_column_id_set.contains(&column_metadata.column_id) {
234            continue;
235        }
236        let field = Field::new(
237            &column_metadata.column_schema.name,
238            column_metadata.column_schema.data_type.as_arrow_type(),
239            column_metadata.column_schema.is_nullable(),
240        );
241        fields.push(Arc::new(with_field_id(field, column_metadata.column_id)));
242    }
243
244    let time_index = metadata.time_index_column();
245    let time_index_field = Field::new(
246        &time_index.column_schema.name,
247        time_index.column_schema.data_type.as_arrow_type(),
248        time_index.column_schema.is_nullable(),
249    );
250    fields.push(Arc::new(with_field_id(
251        time_index_field,
252        time_index.column_id,
253    )));
254    fields.extend(internal_fields().iter().cloned());
255
256    Arc::new(datatypes::arrow::datatypes::Schema::new(fields))
257}
258
259#[cfg(test)]
260mod tests {
261    use std::sync::Arc;
262
263    use api::v1::{OpType, SemanticType};
264    use datatypes::arrow::array::{Array, TimestampMillisecondArray, UInt8Array, UInt64Array};
265    use datatypes::arrow::datatypes::UInt32Type;
266    use datatypes::prelude::ConcreteDataType;
267    use datatypes::schema::ColumnSchema;
268    use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
269    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
270    use store_api::storage::RegionId;
271
272    use super::*;
273    use crate::read::flat_projection::FlatProjectionMapper;
274    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
275    use crate::test_util::new_batch_builder;
276    use crate::test_util::sst_util::{new_primary_key, sst_region_metadata};
277
278    /// Helper to build the adapter from batches and metadata.
279    fn build_adapter(
280        batches: Vec<Batch>,
281        metadata: &RegionMetadataRef,
282        codec: &Arc<dyn PrimaryKeyCodec>,
283    ) -> BatchToRecordBatchAdapter {
284        let read_column_ids = metadata
285            .column_metadatas
286            .iter()
287            .map(|column| column.column_id)
288            .collect::<Vec<_>>();
289        let iter: BoxedBatchIterator = Box::new(batches.into_iter().map(Ok));
290        BatchToRecordBatchAdapter::new(
291            iter,
292            Arc::clone(metadata),
293            Arc::clone(codec),
294            &read_column_ids,
295        )
296    }
297
298    #[test]
299    fn test_single_batch_two_tags() {
300        // Schema: tag_0(string), tag_1(string), field_0(u64), ts
301        let metadata = Arc::new(sst_region_metadata());
302        let codec = build_primary_key_codec(&metadata);
303
304        let pk = new_primary_key(&["host-1", "region-a"]);
305        let batch = new_batch_builder(
306            &pk,
307            &[1, 2, 3],
308            &[100, 100, 100],
309            &[OpType::Put, OpType::Put, OpType::Put],
310            2,
311            &[10, 20, 30],
312        )
313        .build()
314        .unwrap();
315
316        let adapter = build_adapter(vec![batch], &metadata, &codec);
317        let results: Vec<_> = adapter.collect::<Vec<_>>();
318        assert_eq!(1, results.len());
319
320        let rb = results[0].as_ref().unwrap();
321        let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
322        assert_eq!(rb.schema(), expected_schema);
323        assert_eq!(3, rb.num_rows());
324        // 2 tags + 1 field + 1 time index + 3 internal = 7 columns
325        assert_eq!(7, rb.num_columns());
326    }
327
328    #[test]
329    fn test_multiple_batches() {
330        let metadata = Arc::new(sst_region_metadata());
331        let codec = build_primary_key_codec(&metadata);
332
333        let pk1 = new_primary_key(&["a", "b"]);
334        let batch1 = new_batch_builder(
335            &pk1,
336            &[1, 2],
337            &[100, 100],
338            &[OpType::Put, OpType::Put],
339            2,
340            &[10, 20],
341        )
342        .build()
343        .unwrap();
344
345        let pk2 = new_primary_key(&["c", "d"]);
346        let batch2 = new_batch_builder(
347            &pk2,
348            &[3, 4],
349            &[200, 200],
350            &[OpType::Put, OpType::Put],
351            2,
352            &[30, 40],
353        )
354        .build()
355        .unwrap();
356
357        let adapter = build_adapter(vec![batch1, batch2], &metadata, &codec);
358        let results: Vec<_> = adapter.map(|r| r.unwrap()).collect();
359        assert_eq!(2, results.len());
360
361        assert_eq!(2, results[0].num_rows());
362        assert_eq!(2, results[1].num_rows());
363    }
364
365    #[test]
366    fn test_empty_batch_skipped() {
367        let metadata = Arc::new(sst_region_metadata());
368        let codec = build_primary_key_codec(&metadata);
369
370        let empty = Batch::empty();
371        let pk = new_primary_key(&["x", "y"]);
372        let batch = new_batch_builder(&pk, &[1], &[1], &[OpType::Put], 2, &[42])
373            .build()
374            .unwrap();
375
376        let adapter = build_adapter(vec![empty, batch], &metadata, &codec);
377        let results: Vec<_> = adapter.map(|r| r.unwrap()).collect();
378        assert_eq!(1, results.len());
379        assert_eq!(1, results[0].num_rows());
380    }
381
382    #[test]
383    fn test_no_tags() {
384        // Schema with no primary key columns: field_0(u64), ts
385        let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0));
386        builder
387            .push_column_metadata(ColumnMetadata {
388                column_schema: ColumnSchema::new(
389                    "field_0".to_string(),
390                    ConcreteDataType::uint64_datatype(),
391                    true,
392                ),
393                semantic_type: SemanticType::Field,
394                column_id: 0,
395            })
396            .push_column_metadata(ColumnMetadata {
397                column_schema: ColumnSchema::new(
398                    "ts".to_string(),
399                    ConcreteDataType::timestamp_millisecond_datatype(),
400                    false,
401                ),
402                semantic_type: SemanticType::Timestamp,
403                column_id: 1,
404            });
405        builder.primary_key(vec![]);
406        let metadata = Arc::new(builder.build().unwrap());
407        let codec = build_primary_key_codec(&metadata);
408
409        // Empty primary key
410        let pk = vec![];
411        let batch = new_batch_builder(
412            &pk,
413            &[1, 2],
414            &[100, 100],
415            &[OpType::Put, OpType::Put],
416            0,
417            &[10, 20],
418        )
419        .build()
420        .unwrap();
421
422        let adapter = build_adapter(vec![batch], &metadata, &codec);
423        let results: Vec<_> = adapter.map(|r| r.unwrap()).collect();
424        assert_eq!(1, results.len());
425
426        let rb = &results[0];
427        let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
428        assert_eq!(rb.schema(), expected_schema);
429        // 0 tags + 1 field + 1 time index + 3 internal = 5 columns
430        assert_eq!(5, rb.num_columns());
431        assert_eq!(2, rb.num_rows());
432    }
433
434    #[test]
435    fn test_primary_key_dict_column() {
436        // Verify the __primary_key column is a proper dictionary array.
437        let metadata = Arc::new(sst_region_metadata());
438        let codec = build_primary_key_codec(&metadata);
439
440        let pk = new_primary_key(&["host", "az"]);
441        let batch = new_batch_builder(
442            &pk,
443            &[1, 2],
444            &[1, 1],
445            &[OpType::Put, OpType::Put],
446            2,
447            &[5, 6],
448        )
449        .build()
450        .unwrap();
451
452        let adapter = build_adapter(vec![batch.clone()], &metadata, &codec);
453        let rb = adapter.into_iter().next().unwrap().unwrap();
454
455        // __primary_key is at num_columns - 3
456        let pk_col_idx = rb.num_columns() - 3;
457        let pk_array = rb
458            .column(pk_col_idx)
459            .as_any()
460            .downcast_ref::<DictionaryArray<UInt32Type>>()
461            .expect("should be DictionaryArray<UInt32>");
462
463        // Should have 2 rows, all pointing to key 0
464        assert_eq!(2, pk_array.len());
465        assert_eq!(0, pk_array.keys().value(0));
466        assert_eq!(0, pk_array.keys().value(1));
467
468        // The single dictionary value should be the encoded pk bytes.
469        let values = pk_array
470            .values()
471            .as_any()
472            .downcast_ref::<BinaryArray>()
473            .unwrap();
474        assert_eq!(1, values.len());
475        assert_eq!(batch.primary_key(), values.value(0));
476    }
477
478    #[test]
479    fn test_sequence_and_op_type_columns() {
480        let metadata = Arc::new(sst_region_metadata());
481        let codec = build_primary_key_codec(&metadata);
482
483        let pk = new_primary_key(&["a", "b"]);
484        let batch = new_batch_builder(
485            &pk,
486            &[10, 20, 30],
487            &[1, 2, 3],
488            &[OpType::Put, OpType::Delete, OpType::Put],
489            2,
490            &[100, 200, 300],
491        )
492        .build()
493        .unwrap();
494
495        let adapter = build_adapter(vec![batch], &metadata, &codec);
496        let rb = adapter.into_iter().next().unwrap().unwrap();
497
498        // __sequence is at num_columns - 2
499        let seq_idx = rb.num_columns() - 2;
500        let seq_array = rb
501            .column(seq_idx)
502            .as_any()
503            .downcast_ref::<UInt64Array>()
504            .unwrap();
505        assert_eq!(&[1u64, 2, 3], seq_array.values().as_ref());
506
507        // __op_type is at num_columns - 1
508        let op_idx = rb.num_columns() - 1;
509        let op_array = rb
510            .column(op_idx)
511            .as_any()
512            .downcast_ref::<UInt8Array>()
513            .unwrap();
514        assert_eq!(
515            &[OpType::Put as u8, OpType::Delete as u8, OpType::Put as u8],
516            op_array.values().as_ref()
517        );
518    }
519
520    #[test]
521    fn test_integer_tag_column() {
522        // Schema with an integer (non-string) tag: tag_0(u32), field_0(u64), ts
523        let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0));
524        builder
525            .push_column_metadata(ColumnMetadata {
526                column_schema: ColumnSchema::new(
527                    "tag_0".to_string(),
528                    ConcreteDataType::uint32_datatype(),
529                    false,
530                ),
531                semantic_type: SemanticType::Tag,
532                column_id: 0,
533            })
534            .push_column_metadata(ColumnMetadata {
535                column_schema: ColumnSchema::new(
536                    "field_0".to_string(),
537                    ConcreteDataType::uint64_datatype(),
538                    true,
539                ),
540                semantic_type: SemanticType::Field,
541                column_id: 1,
542            })
543            .push_column_metadata(ColumnMetadata {
544                column_schema: ColumnSchema::new(
545                    "ts".to_string(),
546                    ConcreteDataType::timestamp_millisecond_datatype(),
547                    false,
548                ),
549                semantic_type: SemanticType::Timestamp,
550                column_id: 2,
551            });
552        builder.primary_key(vec![0]);
553        let metadata = Arc::new(builder.build().unwrap());
554        let codec = build_primary_key_codec(&metadata);
555
556        // Encode integer primary key
557        let pk = {
558            use datatypes::value::ValueRef;
559            use mito_codec::row_converter::PrimaryKeyCodecExt;
560            let codec_ext = mito_codec::row_converter::DensePrimaryKeyCodec::with_fields(vec![(
561                0,
562                mito_codec::row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
563            )]);
564            codec_ext
565                .encode([ValueRef::UInt32(42)].into_iter())
566                .unwrap()
567        };
568        let batch = new_batch_builder(
569            &pk,
570            &[1, 2],
571            &[1, 1],
572            &[OpType::Put, OpType::Put],
573            1,
574            &[10, 20],
575        )
576        .build()
577        .unwrap();
578
579        let adapter = build_adapter(vec![batch], &metadata, &codec);
580        let rb = adapter.into_iter().next().unwrap().unwrap();
581
582        let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
583        assert_eq!(rb.schema(), expected_schema);
584
585        // tag_0 column (index 0) should be a regular (non-dictionary) UInt32 array
586        let tag_array = rb
587            .column(0)
588            .as_any()
589            .downcast_ref::<UInt32Array>()
590            .expect("integer tag should be a plain UInt32Array");
591        assert_eq!(&[42u32, 42], tag_array.values().as_ref());
592    }
593
594    #[test]
595    fn test_with_precomputed_pk_values() {
596        // If pk_values are already set on the Batch, the adapter should use them
597        // instead of calling codec.decode().
598        let metadata = Arc::new(sst_region_metadata());
599        let codec = build_primary_key_codec(&metadata);
600
601        let pk = new_primary_key(&["pre", "computed"]);
602        let mut batch = new_batch_builder(&pk, &[1], &[1], &[OpType::Put], 2, &[99])
603            .build()
604            .unwrap();
605
606        // Decode and set pk_values ahead of time.
607        let decoded = codec.decode(&pk).unwrap();
608        batch.set_pk_values(decoded);
609
610        let adapter = build_adapter(vec![batch], &metadata, &codec);
611        let rb = adapter.into_iter().next().unwrap().unwrap();
612        assert_eq!(1, rb.num_rows());
613
614        let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
615        assert_eq!(rb.schema(), expected_schema);
616    }
617
618    #[test]
619    fn test_partial_projection_schema_matches_mapper() {
620        let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0));
621        builder
622            .push_column_metadata(ColumnMetadata {
623                column_schema: ColumnSchema::new(
624                    "tag_0".to_string(),
625                    ConcreteDataType::string_datatype(),
626                    true,
627                ),
628                semantic_type: SemanticType::Tag,
629                column_id: 0,
630            })
631            .push_column_metadata(ColumnMetadata {
632                column_schema: ColumnSchema::new(
633                    "tag_1".to_string(),
634                    ConcreteDataType::string_datatype(),
635                    true,
636                ),
637                semantic_type: SemanticType::Tag,
638                column_id: 1,
639            })
640            .push_column_metadata(ColumnMetadata {
641                column_schema: ColumnSchema::new(
642                    "field_0".to_string(),
643                    ConcreteDataType::uint64_datatype(),
644                    true,
645                ),
646                semantic_type: SemanticType::Field,
647                column_id: 2,
648            })
649            .push_column_metadata(ColumnMetadata {
650                column_schema: ColumnSchema::new(
651                    "field_1".to_string(),
652                    ConcreteDataType::uint64_datatype(),
653                    true,
654                ),
655                semantic_type: SemanticType::Field,
656                column_id: 3,
657            })
658            .push_column_metadata(ColumnMetadata {
659                column_schema: ColumnSchema::new(
660                    "ts".to_string(),
661                    ConcreteDataType::timestamp_millisecond_datatype(),
662                    false,
663                ),
664                semantic_type: SemanticType::Timestamp,
665                column_id: 4,
666            });
667        builder.primary_key(vec![0, 1]);
668        let metadata = Arc::new(builder.build().unwrap());
669        let codec = build_primary_key_codec(&metadata);
670
671        // Project tag_0 and field_1; skip tag_1 and field_0.
672        let read_column_ids = vec![0, 3];
673
674        let pk = new_primary_key(&["host-1", "region-a"]);
675        let batch = new_batch_builder(
676            &pk,
677            &[1, 2, 3],
678            &[100, 100, 100],
679            &[OpType::Put, OpType::Put, OpType::Put],
680            3,
681            &[10, 20, 30],
682        )
683        .build()
684        .unwrap();
685
686        let iter: BoxedBatchIterator = Box::new(vec![Ok(batch)].into_iter());
687        let adapter =
688            BatchToRecordBatchAdapter::new(iter, metadata.clone(), codec, &read_column_ids);
689        let rb = adapter.into_iter().next().unwrap().unwrap();
690
691        let mapper = FlatProjectionMapper::new(&metadata, [0, 3]).unwrap();
692        assert_eq!(rb.schema(), mapper.input_arrow_schema(false));
693        // tag_0 + field_1 + ts + 3 internal columns.
694        assert_eq!(6, rb.num_columns());
695        assert_eq!(3, rb.num_rows());
696
697        let field_1 = rb.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
698        assert_eq!(&[10u64, 20, 30], field_1.values().as_ref());
699
700        let ts = rb
701            .column(2)
702            .as_any()
703            .downcast_ref::<TimestampMillisecondArray>()
704            .unwrap();
705        assert_eq!(&[1i64, 2, 3], ts.values().as_ref());
706    }
707}