mito2/memtable/bulk/
part_reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::time::Instant;
17
18use datatypes::arrow::array::BooleanArray;
19use datatypes::arrow::record_batch::RecordBatch;
20use parquet::arrow::ProjectionMask;
21use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
22use snafu::ResultExt;
23use store_api::storage::SequenceRange;
24
25use crate::error::{self, ComputeArrowSnafu, DecodeArrowRowGroupSnafu};
26use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
27use crate::memtable::bulk::part::EncodedBulkPart;
28use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder;
29use crate::memtable::{MemScanMetrics, MemScanMetricsData};
30use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
31use crate::sst::parquet::file_range::{PreFilterMode, TagDecodeState};
32use crate::sst::parquet::flat_format::sequence_column_index;
33use crate::sst::parquet::reader::RowGroupReaderContext;
34
35/// Iterator for reading data inside a bulk part.
36pub struct EncodedBulkPartIter {
37    context: BulkIterContextRef,
38    row_groups_to_read: VecDeque<usize>,
39    current_reader: Option<ParquetRecordBatchReader>,
40    builder: MemtableRowGroupReaderBuilder,
41    /// Sequence number filter.
42    sequence: Option<SequenceRange>,
43    /// Cached skip_fields for current row group.
44    current_skip_fields: bool,
45    /// Metrics for this iterator.
46    metrics: MemScanMetricsData,
47    /// Optional memory scan metrics to report to.
48    mem_scan_metrics: Option<MemScanMetrics>,
49}
50
51impl EncodedBulkPartIter {
52    /// Creates a new [BulkPartIter].
53    pub(crate) fn try_new(
54        encoded_part: &EncodedBulkPart,
55        context: BulkIterContextRef,
56        mut row_groups_to_read: VecDeque<usize>,
57        sequence: Option<SequenceRange>,
58        mem_scan_metrics: Option<MemScanMetrics>,
59    ) -> error::Result<Self> {
60        assert!(context.read_format().as_flat().is_some());
61
62        let parquet_meta = encoded_part.metadata().parquet_metadata.clone();
63        let data = encoded_part.data().clone();
64        let series_count = encoded_part.metadata().num_series as usize;
65
66        let projection_mask = ProjectionMask::roots(
67            parquet_meta.file_metadata().schema_descr(),
68            context.read_format().projection_indices().iter().copied(),
69        );
70        let builder =
71            MemtableRowGroupReaderBuilder::try_new(&context, projection_mask, parquet_meta, data)?;
72
73        let (init_reader, current_skip_fields) = match row_groups_to_read.pop_front() {
74            Some(first_row_group) => {
75                let skip_fields = builder.compute_skip_fields(&context, first_row_group);
76                let reader = builder.build_row_group_reader(first_row_group, None)?;
77                (Some(reader), skip_fields)
78            }
79            None => (None, false),
80        };
81
82        Ok(Self {
83            context,
84            row_groups_to_read,
85            current_reader: init_reader,
86            builder,
87            sequence,
88            current_skip_fields,
89            metrics: MemScanMetricsData {
90                total_series: series_count,
91                ..Default::default()
92            },
93            mem_scan_metrics,
94        })
95    }
96
97    fn report_mem_scan_metrics(&mut self) {
98        if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
99            mem_scan_metrics.merge_inner(&self.metrics);
100        }
101    }
102
103    /// Fetches next non-empty record batch.
104    pub(crate) fn next_record_batch(&mut self) -> error::Result<Option<RecordBatch>> {
105        let start = Instant::now();
106
107        let Some(current) = &mut self.current_reader else {
108            // All row group exhausted.
109            self.metrics.scan_cost += start.elapsed();
110            return Ok(None);
111        };
112
113        for batch in current {
114            let batch = batch.context(DecodeArrowRowGroupSnafu)?;
115            if let Some(batch) = apply_combined_filters(
116                &self.context,
117                &self.sequence,
118                batch,
119                self.current_skip_fields,
120            )? {
121                // Update metrics
122                self.metrics.num_batches += 1;
123                self.metrics.num_rows += batch.num_rows();
124                self.metrics.scan_cost += start.elapsed();
125                return Ok(Some(batch));
126            }
127        }
128
129        // Previous row group exhausted, read next row group
130        while let Some(next_row_group) = self.row_groups_to_read.pop_front() {
131            // Compute skip_fields for this row group
132            self.current_skip_fields = self
133                .builder
134                .compute_skip_fields(&self.context, next_row_group);
135
136            let next_reader = self.builder.build_row_group_reader(next_row_group, None)?;
137            let current = self.current_reader.insert(next_reader);
138
139            for batch in current {
140                let batch = batch.context(DecodeArrowRowGroupSnafu)?;
141                if let Some(batch) = apply_combined_filters(
142                    &self.context,
143                    &self.sequence,
144                    batch,
145                    self.current_skip_fields,
146                )? {
147                    // Update metrics
148                    self.metrics.num_batches += 1;
149                    self.metrics.num_rows += batch.num_rows();
150                    self.metrics.scan_cost += start.elapsed();
151                    return Ok(Some(batch));
152                }
153            }
154        }
155
156        self.metrics.scan_cost += start.elapsed();
157        Ok(None)
158    }
159}
160
161impl Iterator for EncodedBulkPartIter {
162    type Item = error::Result<RecordBatch>;
163
164    fn next(&mut self) -> Option<Self::Item> {
165        let result = self.next_record_batch().transpose();
166
167        // Report metrics when iteration is complete
168        if result.is_none() {
169            self.report_mem_scan_metrics();
170        }
171
172        result
173    }
174}
175
176impl Drop for EncodedBulkPartIter {
177    fn drop(&mut self) {
178        common_telemetry::debug!(
179            "EncodedBulkPartIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}",
180            self.context.region_id(),
181            self.metrics.total_series,
182            self.metrics.num_rows,
183            self.metrics.num_batches,
184            self.metrics.scan_cost
185        );
186
187        // Report MemScanMetrics if not already reported
188        self.report_mem_scan_metrics();
189
190        READ_ROWS_TOTAL
191            .with_label_values(&["bulk_memtable"])
192            .inc_by(self.metrics.num_rows as u64);
193        READ_STAGE_ELAPSED
194            .with_label_values(&["scan_memtable"])
195            .observe(self.metrics.scan_cost.as_secs_f64());
196    }
197}
198
199/// Iterator for reading record batches from a bulk part.
200///
201/// Iterates through one or more RecordBatches, applying filters and projections.
202pub struct BulkPartBatchIter {
203    /// Queue of RecordBatches to process.
204    batches: VecDeque<RecordBatch>,
205    /// Iterator context for filtering and projection.
206    context: BulkIterContextRef,
207    /// Sequence number filter.
208    sequence: Option<SequenceRange>,
209    /// Metrics for this iterator.
210    metrics: MemScanMetricsData,
211    /// Optional memory scan metrics to report to.
212    mem_scan_metrics: Option<MemScanMetrics>,
213}
214
215impl BulkPartBatchIter {
216    /// Creates a new [BulkPartBatchIter] from multiple RecordBatches.
217    pub fn new(
218        batches: Vec<RecordBatch>,
219        context: BulkIterContextRef,
220        sequence: Option<SequenceRange>,
221        series_count: usize,
222        mem_scan_metrics: Option<MemScanMetrics>,
223    ) -> Self {
224        assert!(context.read_format().as_flat().is_some());
225
226        Self {
227            batches: VecDeque::from(batches),
228            context,
229            sequence,
230            metrics: MemScanMetricsData {
231                total_series: series_count,
232                ..Default::default()
233            },
234            mem_scan_metrics,
235        }
236    }
237
238    /// Creates a new [BulkPartBatchIter] from a single RecordBatch.
239    pub fn from_single(
240        record_batch: RecordBatch,
241        context: BulkIterContextRef,
242        sequence: Option<SequenceRange>,
243        series_count: usize,
244        mem_scan_metrics: Option<MemScanMetrics>,
245    ) -> Self {
246        Self::new(
247            vec![record_batch],
248            context,
249            sequence,
250            series_count,
251            mem_scan_metrics,
252        )
253    }
254
255    fn report_mem_scan_metrics(&mut self) {
256        if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
257            mem_scan_metrics.merge_inner(&self.metrics);
258        }
259    }
260
261    /// Applies projection to the RecordBatch if needed.
262    fn apply_projection(&self, record_batch: RecordBatch) -> error::Result<RecordBatch> {
263        let projection_indices = self.context.read_format().projection_indices();
264        if projection_indices.len() == record_batch.num_columns() {
265            return Ok(record_batch);
266        }
267
268        record_batch
269            .project(projection_indices)
270            .context(ComputeArrowSnafu)
271    }
272
273    fn process_batch(&mut self, record_batch: RecordBatch) -> error::Result<Option<RecordBatch>> {
274        let start = Instant::now();
275
276        // Apply projection first.
277        let projected_batch = self.apply_projection(record_batch)?;
278
279        // Apply combined filtering (both predicate and sequence filters)
280        let skip_fields = match self.context.pre_filter_mode() {
281            PreFilterMode::All => false,
282            PreFilterMode::SkipFields => true,
283            PreFilterMode::SkipFieldsOnDelete => true,
284        };
285
286        let Some(filtered_batch) =
287            apply_combined_filters(&self.context, &self.sequence, projected_batch, skip_fields)?
288        else {
289            self.metrics.scan_cost += start.elapsed();
290            return Ok(None);
291        };
292
293        // Update metrics
294        self.metrics.num_batches += 1;
295        self.metrics.num_rows += filtered_batch.num_rows();
296        self.metrics.scan_cost += start.elapsed();
297
298        Ok(Some(filtered_batch))
299    }
300}
301
302impl Iterator for BulkPartBatchIter {
303    type Item = error::Result<RecordBatch>;
304
305    fn next(&mut self) -> Option<Self::Item> {
306        // Process batches until we find a non-empty one or run out
307        while let Some(batch) = self.batches.pop_front() {
308            match self.process_batch(batch) {
309                Ok(Some(result)) => return Some(Ok(result)),
310                Ok(None) => continue, // This batch was filtered out, try next
311                Err(e) => {
312                    self.report_mem_scan_metrics();
313                    return Some(Err(e));
314                }
315            }
316        }
317
318        // No more batches
319        self.report_mem_scan_metrics();
320        None
321    }
322}
323
324impl Drop for BulkPartBatchIter {
325    fn drop(&mut self) {
326        common_telemetry::debug!(
327            "BulkPartBatchIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}",
328            self.context.region_id(),
329            self.metrics.total_series,
330            self.metrics.num_rows,
331            self.metrics.num_batches,
332            self.metrics.scan_cost
333        );
334
335        // Report MemScanMetrics if not already reported
336        self.report_mem_scan_metrics();
337
338        READ_ROWS_TOTAL
339            .with_label_values(&["bulk_memtable"])
340            .inc_by(self.metrics.num_rows as u64);
341        READ_STAGE_ELAPSED
342            .with_label_values(&["scan_memtable"])
343            .observe(self.metrics.scan_cost.as_secs_f64());
344    }
345}
346
347/// Applies both predicate filtering and sequence filtering in a single pass.
348/// Returns None if the filtered batch is empty.
349///
350/// # Panics
351/// Panics if the format is not flat.
352fn apply_combined_filters(
353    context: &BulkIterContext,
354    sequence: &Option<SequenceRange>,
355    record_batch: RecordBatch,
356    skip_fields: bool,
357) -> error::Result<Option<RecordBatch>> {
358    // Converts the format to the flat format first.
359    let format = context.read_format().as_flat().unwrap();
360    let record_batch = format.convert_batch(record_batch, None)?;
361
362    let num_rows = record_batch.num_rows();
363    let mut combined_filter = None;
364    let mut tag_decode_state = TagDecodeState::new();
365
366    // First, apply predicate filters using the shared method.
367    if !context.base.filters.is_empty() {
368        let predicate_mask = context.base.compute_filter_mask_flat(
369            &record_batch,
370            skip_fields,
371            &mut tag_decode_state,
372        )?;
373        // If predicate filters out the entire batch, return None early
374        let Some(mask) = predicate_mask else {
375            return Ok(None);
376        };
377        combined_filter = Some(BooleanArray::from(mask));
378    }
379
380    // Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`.
381    if let Some(sequence) = sequence {
382        let sequence_column =
383            record_batch.column(sequence_column_index(record_batch.num_columns()));
384        let sequence_filter = sequence
385            .filter(&sequence_column)
386            .context(ComputeArrowSnafu)?;
387        // Combine with existing filter using AND operation
388        combined_filter = match combined_filter {
389            None => Some(sequence_filter),
390            Some(existing_filter) => {
391                let and_result = datatypes::arrow::compute::and(&existing_filter, &sequence_filter)
392                    .context(ComputeArrowSnafu)?;
393                Some(and_result)
394            }
395        };
396    }
397
398    // Apply the combined filter if any filters were applied
399    let Some(filter_array) = combined_filter else {
400        // No filters applied, return original batch
401        return Ok(Some(record_batch));
402    };
403    let select_count = filter_array.true_count();
404    if select_count == 0 {
405        return Ok(None);
406    }
407    if select_count == num_rows {
408        return Ok(Some(record_batch));
409    }
410    let filtered_batch =
411        datatypes::arrow::compute::filter_record_batch(&record_batch, &filter_array)
412            .context(ComputeArrowSnafu)?;
413
414    Ok(Some(filtered_batch))
415}
416
417#[cfg(test)]
418mod tests {
419    use std::sync::Arc;
420
421    use api::v1::SemanticType;
422    use datafusion_expr::{col, lit};
423    use datatypes::arrow::array::{
424        ArrayRef, BinaryArray, DictionaryArray, Int64Array, StringArray, UInt8Array, UInt32Array,
425        UInt64Array,
426    };
427    use datatypes::arrow::datatypes::{DataType, Field, Schema};
428    use datatypes::data_type::ConcreteDataType;
429    use datatypes::schema::ColumnSchema;
430    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
431    use store_api::storage::RegionId;
432    use table::predicate::Predicate;
433
434    use super::*;
435    use crate::memtable::bulk::context::BulkIterContext;
436
437    #[test]
438    fn test_bulk_part_batch_iter() {
439        // Create a simple schema
440        let schema = Arc::new(Schema::new(vec![
441            Field::new("key1", DataType::Utf8, false),
442            Field::new("field1", DataType::Int64, false),
443            Field::new(
444                "timestamp",
445                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None),
446                false,
447            ),
448            Field::new(
449                "__primary_key",
450                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
451                false,
452            ),
453            Field::new("__sequence", DataType::UInt64, false),
454            Field::new("__op_type", DataType::UInt8, false),
455        ]));
456
457        // Create test data
458        let key1 = Arc::new(StringArray::from_iter_values(["key1", "key2", "key3"]));
459        let field1 = Arc::new(Int64Array::from(vec![11, 12, 13]));
460        let timestamp = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
461            vec![1000, 2000, 3000],
462        ));
463
464        // Create primary key dictionary array
465        use datatypes::arrow::array::{BinaryArray, DictionaryArray, UInt32Array};
466        let values = Arc::new(BinaryArray::from_iter_values([b"key1", b"key2", b"key3"]));
467        let keys = UInt32Array::from(vec![0, 1, 2]);
468        let primary_key = Arc::new(DictionaryArray::new(keys, values));
469
470        let sequence = Arc::new(UInt64Array::from(vec![1, 2, 3]));
471        let op_type = Arc::new(UInt8Array::from(vec![1, 1, 1])); // PUT operations
472
473        let record_batch = RecordBatch::try_new(
474            schema,
475            vec![
476                key1,
477                field1,
478                timestamp,
479                primary_key.clone(),
480                sequence,
481                op_type,
482            ],
483        )
484        .unwrap();
485
486        // Create a minimal region metadata for testing
487        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
488        builder
489            .push_column_metadata(ColumnMetadata {
490                column_schema: ColumnSchema::new(
491                    "key1",
492                    ConcreteDataType::string_datatype(),
493                    false,
494                ),
495                semantic_type: SemanticType::Tag,
496                column_id: 0,
497            })
498            .push_column_metadata(ColumnMetadata {
499                column_schema: ColumnSchema::new(
500                    "field1",
501                    ConcreteDataType::int64_datatype(),
502                    false,
503                ),
504                semantic_type: SemanticType::Field,
505                column_id: 1,
506            })
507            .push_column_metadata(ColumnMetadata {
508                column_schema: ColumnSchema::new(
509                    "timestamp",
510                    ConcreteDataType::timestamp_millisecond_datatype(),
511                    false,
512                ),
513                semantic_type: SemanticType::Timestamp,
514                column_id: 2,
515            })
516            .primary_key(vec![0]);
517
518        let region_metadata = builder.build().unwrap();
519
520        // Create context
521        let context = Arc::new(
522            BulkIterContext::new(
523                Arc::new(region_metadata.clone()),
524                None, // No projection
525                None, // No predicate
526                false,
527            )
528            .unwrap(),
529        );
530        // Iterates all rows.
531        let iter =
532            BulkPartBatchIter::from_single(record_batch.clone(), context.clone(), None, 0, None);
533        let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
534        assert_eq!(1, result.len());
535        assert_eq!(3, result[0].num_rows());
536        assert_eq!(6, result[0].num_columns(),);
537
538        // Creates iter with sequence filter (only include sequences <= 2)
539        let iter = BulkPartBatchIter::from_single(
540            record_batch.clone(),
541            context,
542            Some(SequenceRange::LtEq { max: 2 }),
543            0,
544            None,
545        );
546        let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
547        assert_eq!(1, result.len());
548        let expect_sequence = Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef;
549        assert_eq!(
550            &expect_sequence,
551            result[0].column(result[0].num_columns() - 2)
552        );
553        assert_eq!(6, result[0].num_columns());
554
555        let context = Arc::new(
556            BulkIterContext::new(
557                Arc::new(region_metadata),
558                Some(&[0, 2]),
559                Some(Predicate::new(vec![col("key1").eq(lit("key2"))])),
560                false,
561            )
562            .unwrap(),
563        );
564        // Creates iter with projection and predicate.
565        let iter =
566            BulkPartBatchIter::from_single(record_batch.clone(), context.clone(), None, 0, None);
567        let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
568        assert_eq!(1, result.len());
569        assert_eq!(1, result[0].num_rows());
570        assert_eq!(5, result[0].num_columns());
571        let expect_sequence = Arc::new(UInt64Array::from(vec![2])) as ArrayRef;
572        assert_eq!(
573            &expect_sequence,
574            result[0].column(result[0].num_columns() - 2)
575        );
576    }
577
578    #[test]
579    fn test_bulk_part_batch_iter_multiple_batches() {
580        // Create a simple schema
581        let schema = Arc::new(Schema::new(vec![
582            Field::new("key1", DataType::Utf8, false),
583            Field::new("field1", DataType::Int64, false),
584            Field::new(
585                "timestamp",
586                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None),
587                false,
588            ),
589            Field::new(
590                "__primary_key",
591                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
592                false,
593            ),
594            Field::new("__sequence", DataType::UInt64, false),
595            Field::new("__op_type", DataType::UInt8, false),
596        ]));
597
598        // Create first batch with 2 rows
599        let key1_1 = Arc::new(StringArray::from_iter_values(["key1", "key2"]));
600        let field1_1 = Arc::new(Int64Array::from(vec![11, 12]));
601        let timestamp_1 = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
602            vec![1000, 2000],
603        ));
604        let values_1 = Arc::new(BinaryArray::from_iter_values([b"key1", b"key2"]));
605        let keys_1 = UInt32Array::from(vec![0, 1]);
606        let primary_key_1 = Arc::new(DictionaryArray::new(keys_1, values_1));
607        let sequence_1 = Arc::new(UInt64Array::from(vec![1, 2]));
608        let op_type_1 = Arc::new(UInt8Array::from(vec![1, 1]));
609
610        let batch1 = RecordBatch::try_new(
611            schema.clone(),
612            vec![
613                key1_1,
614                field1_1,
615                timestamp_1,
616                primary_key_1,
617                sequence_1,
618                op_type_1,
619            ],
620        )
621        .unwrap();
622
623        // Create second batch with 3 rows
624        let key1_2 = Arc::new(StringArray::from_iter_values(["key3", "key4", "key5"]));
625        let field1_2 = Arc::new(Int64Array::from(vec![13, 14, 15]));
626        let timestamp_2 = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
627            vec![3000, 4000, 5000],
628        ));
629        let values_2 = Arc::new(BinaryArray::from_iter_values([b"key3", b"key4", b"key5"]));
630        let keys_2 = UInt32Array::from(vec![0, 1, 2]);
631        let primary_key_2 = Arc::new(DictionaryArray::new(keys_2, values_2));
632        let sequence_2 = Arc::new(UInt64Array::from(vec![3, 4, 5]));
633        let op_type_2 = Arc::new(UInt8Array::from(vec![1, 1, 1]));
634
635        let batch2 = RecordBatch::try_new(
636            schema.clone(),
637            vec![
638                key1_2,
639                field1_2,
640                timestamp_2,
641                primary_key_2,
642                sequence_2,
643                op_type_2,
644            ],
645        )
646        .unwrap();
647
648        // Create region metadata
649        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
650        builder
651            .push_column_metadata(ColumnMetadata {
652                column_schema: ColumnSchema::new(
653                    "key1",
654                    ConcreteDataType::string_datatype(),
655                    false,
656                ),
657                semantic_type: SemanticType::Tag,
658                column_id: 0,
659            })
660            .push_column_metadata(ColumnMetadata {
661                column_schema: ColumnSchema::new(
662                    "field1",
663                    ConcreteDataType::int64_datatype(),
664                    false,
665                ),
666                semantic_type: SemanticType::Field,
667                column_id: 1,
668            })
669            .push_column_metadata(ColumnMetadata {
670                column_schema: ColumnSchema::new(
671                    "timestamp",
672                    ConcreteDataType::timestamp_millisecond_datatype(),
673                    false,
674                ),
675                semantic_type: SemanticType::Timestamp,
676                column_id: 2,
677            })
678            .primary_key(vec![0]);
679
680        let region_metadata = builder.build().unwrap();
681
682        // Create context
683        let context = Arc::new(
684            BulkIterContext::new(
685                Arc::new(region_metadata),
686                None, // No projection
687                None, // No predicate
688                false,
689            )
690            .unwrap(),
691        );
692
693        // Create iterator with multiple batches
694        let expect_batches = vec![batch1, batch2];
695        let iter = BulkPartBatchIter::new(expect_batches.clone(), context.clone(), None, 0, None);
696
697        // Collect all results
698        let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
699        assert_eq!(expect_batches, result);
700    }
701}