Skip to main content

mito2/memtable/bulk/
part_reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::time::Instant;
17
18use datatypes::arrow::array::BooleanArray;
19use datatypes::arrow::record_batch::RecordBatch;
20use mito_codec::row_converter::PrimaryKeyFilter;
21use parquet::arrow::ProjectionMask;
22use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
23use snafu::ResultExt;
24use store_api::storage::SequenceRange;
25
26use crate::error::{self, ComputeArrowSnafu, DecodeArrowRowGroupSnafu};
27use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
28use crate::memtable::bulk::part::EncodedBulkPart;
29use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder;
30use crate::memtable::{MemScanMetrics, MemScanMetricsData};
31use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
32use crate::sst::parquet::file_range::TagDecodeState;
33use crate::sst::parquet::flat_format::{primary_key_column_index, sequence_column_index};
34use crate::sst::parquet::prefilter::{CachedPrimaryKeyFilter, prefilter_flat_batch_by_primary_key};
35
36/// Iterator for reading data inside a bulk part.
37pub struct EncodedBulkPartIter {
38    context: BulkIterContextRef,
39    row_groups_to_read: VecDeque<usize>,
40    current_reader: Option<ParquetRecordBatchReader>,
41    builder: MemtableRowGroupReaderBuilder,
42    /// Sequence number filter.
43    sequence: Option<SequenceRange>,
44    /// Cached skip_fields for current row group.
45    current_skip_fields: bool,
46    /// Primary key filter for prefiltering before convert_batch.
47    pk_filter: Option<CachedPrimaryKeyFilter>,
48    /// Metrics for this iterator.
49    metrics: MemScanMetricsData,
50    /// Optional memory scan metrics to report to.
51    mem_scan_metrics: Option<MemScanMetrics>,
52}
53
54impl EncodedBulkPartIter {
55    /// Creates a new [BulkPartIter].
56    pub fn try_new(
57        encoded_part: &EncodedBulkPart,
58        context: BulkIterContextRef,
59        mut row_groups_to_read: VecDeque<usize>,
60        sequence: Option<SequenceRange>,
61        mem_scan_metrics: Option<MemScanMetrics>,
62    ) -> error::Result<Self> {
63        let parquet_meta = encoded_part.metadata().parquet_metadata.clone();
64        let data = encoded_part.data().clone();
65        let series_count = encoded_part.metadata().num_series as usize;
66
67        // TODO(fys): Nested projection pushdown to the memtable layer is not supported yet.
68        let root_indices = context
69            .read_format()
70            .parquet_read_columns()
71            .root_indices_iter();
72        let projection_mask =
73            ProjectionMask::roots(parquet_meta.file_metadata().schema_descr(), root_indices);
74        let builder =
75            MemtableRowGroupReaderBuilder::try_new(&context, projection_mask, parquet_meta, data)?;
76
77        // Build PK filter if applicable (flat format with dictionary-encoded PKs).
78        let pk_filter = context.build_pk_filter();
79
80        let (init_reader, current_skip_fields) = match row_groups_to_read.pop_front() {
81            Some(first_row_group) => {
82                let skip_fields = context.pre_filter_mode().skip_fields();
83                let reader = builder.build_row_group_reader(first_row_group, None)?;
84                (Some(reader), skip_fields)
85            }
86            None => (None, false),
87        };
88
89        Ok(Self {
90            context,
91            row_groups_to_read,
92            current_reader: init_reader,
93            builder,
94            sequence,
95            current_skip_fields,
96            pk_filter,
97            metrics: MemScanMetricsData {
98                total_series: series_count,
99                ..Default::default()
100            },
101            mem_scan_metrics,
102        })
103    }
104
105    fn report_mem_scan_metrics(&mut self) {
106        if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
107            mem_scan_metrics.merge_inner(&self.metrics);
108        }
109    }
110
111    /// Fetches next non-empty record batch.
112    pub(crate) fn next_record_batch(&mut self) -> error::Result<Option<RecordBatch>> {
113        let start = Instant::now();
114
115        let Some(current) = &mut self.current_reader else {
116            // All row group exhausted.
117            self.metrics.scan_cost += start.elapsed();
118            return Ok(None);
119        };
120
121        for batch in current {
122            let batch = batch.context(DecodeArrowRowGroupSnafu)?;
123            if let Some(batch) = apply_combined_filters(
124                &self.context,
125                &self.sequence,
126                batch,
127                self.current_skip_fields,
128                self.pk_filter
129                    .as_mut()
130                    .map(|f| f as &mut dyn PrimaryKeyFilter),
131                &mut self.metrics,
132            )? {
133                // Update metrics
134                self.metrics.num_batches += 1;
135                self.metrics.num_rows += batch.num_rows();
136                self.metrics.scan_cost += start.elapsed();
137                return Ok(Some(batch));
138            }
139        }
140
141        // Previous row group exhausted, read next row group
142        while let Some(next_row_group) = self.row_groups_to_read.pop_front() {
143            // Compute skip_fields for this row group
144            self.current_skip_fields = self.context.pre_filter_mode().skip_fields();
145
146            let next_reader = self.builder.build_row_group_reader(next_row_group, None)?;
147            let current = self.current_reader.insert(next_reader);
148
149            for batch in current {
150                let batch = batch.context(DecodeArrowRowGroupSnafu)?;
151                if let Some(batch) = apply_combined_filters(
152                    &self.context,
153                    &self.sequence,
154                    batch,
155                    self.current_skip_fields,
156                    self.pk_filter
157                        .as_mut()
158                        .map(|f| f as &mut dyn PrimaryKeyFilter),
159                    &mut self.metrics,
160                )? {
161                    // Update metrics
162                    self.metrics.num_batches += 1;
163                    self.metrics.num_rows += batch.num_rows();
164                    self.metrics.scan_cost += start.elapsed();
165                    return Ok(Some(batch));
166                }
167            }
168        }
169
170        self.metrics.scan_cost += start.elapsed();
171        Ok(None)
172    }
173}
174
175impl Iterator for EncodedBulkPartIter {
176    type Item = error::Result<RecordBatch>;
177
178    fn next(&mut self) -> Option<Self::Item> {
179        let result = self.next_record_batch().transpose();
180
181        // Report metrics when iteration is complete
182        if result.is_none() {
183            self.report_mem_scan_metrics();
184        }
185
186        result
187    }
188}
189
190impl Drop for EncodedBulkPartIter {
191    fn drop(&mut self) {
192        common_telemetry::debug!(
193            "EncodedBulkPartIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}, prefilter_cost={:?}, prefilter_rows_filtered={}",
194            self.context.region_id(),
195            self.metrics.total_series,
196            self.metrics.num_rows,
197            self.metrics.num_batches,
198            self.metrics.scan_cost,
199            self.metrics.prefilter_cost,
200            self.metrics.prefilter_rows_filtered
201        );
202
203        // Report MemScanMetrics if not already reported
204        self.report_mem_scan_metrics();
205
206        READ_ROWS_TOTAL
207            .with_label_values(&["bulk_memtable"])
208            .inc_by(self.metrics.num_rows as u64);
209        READ_STAGE_ELAPSED
210            .with_label_values(&["scan_memtable"])
211            .observe(self.metrics.scan_cost.as_secs_f64());
212    }
213}
214
215/// Iterator for reading record batches from a bulk part.
216///
217/// Iterates through one or more RecordBatches, applying filters and projections.
218pub struct BulkPartBatchIter {
219    /// Queue of RecordBatches to process.
220    batches: VecDeque<RecordBatch>,
221    /// Iterator context for filtering and projection.
222    context: BulkIterContextRef,
223    /// Sequence number filter.
224    sequence: Option<SequenceRange>,
225    /// Primary key filter for prefiltering before convert_batch.
226    pk_filter: Option<CachedPrimaryKeyFilter>,
227    /// Metrics for this iterator.
228    metrics: MemScanMetricsData,
229    /// Optional memory scan metrics to report to.
230    mem_scan_metrics: Option<MemScanMetrics>,
231}
232
233impl BulkPartBatchIter {
234    /// Creates a new [BulkPartBatchIter] from multiple RecordBatches.
235    pub fn new(
236        batches: Vec<RecordBatch>,
237        context: BulkIterContextRef,
238        sequence: Option<SequenceRange>,
239        series_count: usize,
240        mem_scan_metrics: Option<MemScanMetrics>,
241    ) -> Self {
242        let pk_filter = context.build_pk_filter();
243
244        Self {
245            batches: VecDeque::from(batches),
246            context,
247            sequence,
248            pk_filter,
249            metrics: MemScanMetricsData {
250                total_series: series_count,
251                ..Default::default()
252            },
253            mem_scan_metrics,
254        }
255    }
256
257    /// Creates a new [BulkPartBatchIter] from a single RecordBatch.
258    pub fn from_single(
259        record_batch: RecordBatch,
260        context: BulkIterContextRef,
261        sequence: Option<SequenceRange>,
262        series_count: usize,
263        mem_scan_metrics: Option<MemScanMetrics>,
264    ) -> Self {
265        Self::new(
266            vec![record_batch],
267            context,
268            sequence,
269            series_count,
270            mem_scan_metrics,
271        )
272    }
273
274    fn report_mem_scan_metrics(&mut self) {
275        if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
276            mem_scan_metrics.merge_inner(&self.metrics);
277        }
278    }
279
280    /// Applies projection to the RecordBatch if needed.
281    fn apply_projection(&self, record_batch: RecordBatch) -> error::Result<RecordBatch> {
282        let projection_indices = self
283            .context
284            .read_format()
285            .parquet_read_columns()
286            .root_indices();
287        if projection_indices.len() == record_batch.num_columns() {
288            return Ok(record_batch);
289        }
290
291        record_batch
292            .project(projection_indices)
293            .context(ComputeArrowSnafu)
294    }
295
296    fn process_batch(&mut self, record_batch: RecordBatch) -> error::Result<Option<RecordBatch>> {
297        let start = Instant::now();
298
299        // Apply projection first.
300        let projected_batch = self.apply_projection(record_batch)?;
301
302        // Apply combined filtering (both predicate and sequence filters)
303        let skip_fields = self.context.pre_filter_mode().skip_fields();
304
305        let Some(filtered_batch) = apply_combined_filters(
306            &self.context,
307            &self.sequence,
308            projected_batch,
309            skip_fields,
310            self.pk_filter
311                .as_mut()
312                .map(|f| f as &mut dyn PrimaryKeyFilter),
313            &mut self.metrics,
314        )?
315        else {
316            self.metrics.scan_cost += start.elapsed();
317            return Ok(None);
318        };
319
320        // Update metrics
321        self.metrics.num_batches += 1;
322        self.metrics.num_rows += filtered_batch.num_rows();
323        self.metrics.scan_cost += start.elapsed();
324
325        Ok(Some(filtered_batch))
326    }
327}
328
329impl Iterator for BulkPartBatchIter {
330    type Item = error::Result<RecordBatch>;
331
332    fn next(&mut self) -> Option<Self::Item> {
333        // Process batches until we find a non-empty one or run out
334        while let Some(batch) = self.batches.pop_front() {
335            match self.process_batch(batch) {
336                Ok(Some(result)) => return Some(Ok(result)),
337                Ok(None) => continue, // This batch was filtered out, try next
338                Err(e) => {
339                    self.report_mem_scan_metrics();
340                    return Some(Err(e));
341                }
342            }
343        }
344
345        // No more batches
346        self.report_mem_scan_metrics();
347        None
348    }
349}
350
351impl Drop for BulkPartBatchIter {
352    fn drop(&mut self) {
353        common_telemetry::debug!(
354            "BulkPartBatchIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}, prefilter_cost={:?}, prefilter_rows_filtered={}",
355            self.context.region_id(),
356            self.metrics.total_series,
357            self.metrics.num_rows,
358            self.metrics.num_batches,
359            self.metrics.scan_cost,
360            self.metrics.prefilter_cost,
361            self.metrics.prefilter_rows_filtered
362        );
363
364        // Report MemScanMetrics if not already reported
365        self.report_mem_scan_metrics();
366
367        READ_ROWS_TOTAL
368            .with_label_values(&["bulk_memtable"])
369            .inc_by(self.metrics.num_rows as u64);
370        READ_STAGE_ELAPSED
371            .with_label_values(&["scan_memtable"])
372            .observe(self.metrics.scan_cost.as_secs_f64());
373    }
374}
375
376/// Applies both predicate filtering and sequence filtering in a single pass.
377/// Returns None if the filtered batch is empty.
378///
379/// # Panics
380/// Panics if the format is not flat.
381fn apply_combined_filters(
382    context: &BulkIterContext,
383    sequence: &Option<SequenceRange>,
384    record_batch: RecordBatch,
385    skip_fields: bool,
386    pk_filter: Option<&mut dyn PrimaryKeyFilter>,
387    metrics: &mut MemScanMetricsData,
388) -> error::Result<Option<RecordBatch>> {
389    // Apply PK prefilter on raw batch before convert_batch to reduce conversion overhead.
390    let record_batch = if let Some(pk_filter) = pk_filter {
391        let rows_before = record_batch.num_rows();
392        let prefilter_start = Instant::now();
393        let pk_col_idx = primary_key_column_index(record_batch.num_columns());
394        match prefilter_flat_batch_by_primary_key(record_batch, pk_col_idx, pk_filter)? {
395            Some(batch) => {
396                metrics.prefilter_cost += prefilter_start.elapsed();
397                metrics.prefilter_rows_filtered += rows_before - batch.num_rows();
398                batch
399            }
400            None => {
401                metrics.prefilter_cost += prefilter_start.elapsed();
402                metrics.prefilter_rows_filtered += rows_before;
403                return Ok(None);
404            }
405        }
406    } else {
407        record_batch
408    };
409
410    // Converts the format to the flat format.
411    let record_batch = context.read_format().convert_batch(record_batch, None)?;
412
413    let num_rows = record_batch.num_rows();
414    let mut combined_filter = None;
415    let mut tag_decode_state = TagDecodeState::new();
416
417    // Apply predicate filters using the shared method.
418    if !context.base.filters.is_empty() {
419        let predicate_mask = context.base.compute_filter_mask_flat(
420            &record_batch,
421            skip_fields,
422            &mut tag_decode_state,
423        )?;
424        // If predicate filters out the entire batch, return None early
425        let Some(mask) = predicate_mask else {
426            return Ok(None);
427        };
428        combined_filter = Some(BooleanArray::from(mask));
429    }
430
431    // Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`.
432    if let Some(sequence) = sequence {
433        let sequence_column =
434            record_batch.column(sequence_column_index(record_batch.num_columns()));
435        let sequence_filter = sequence
436            .filter(&sequence_column)
437            .context(ComputeArrowSnafu)?;
438        // Combine with existing filter using AND operation
439        combined_filter = match combined_filter {
440            None => Some(sequence_filter),
441            Some(existing_filter) => {
442                let and_result = datatypes::arrow::compute::and(&existing_filter, &sequence_filter)
443                    .context(ComputeArrowSnafu)?;
444                Some(and_result)
445            }
446        };
447    }
448
449    // Apply the combined filter if any filters were applied
450    let Some(filter_array) = combined_filter else {
451        // No filters applied, return original batch
452        return Ok(Some(record_batch));
453    };
454    let select_count = filter_array.true_count();
455    if select_count == 0 {
456        return Ok(None);
457    }
458    if select_count == num_rows {
459        return Ok(Some(record_batch));
460    }
461    let filtered_batch =
462        datatypes::arrow::compute::filter_record_batch(&record_batch, &filter_array)
463            .context(ComputeArrowSnafu)?;
464
465    Ok(Some(filtered_batch))
466}
467
468#[cfg(test)]
469mod tests {
470    use std::sync::Arc;
471
472    use api::v1::SemanticType;
473    use datafusion_expr::{col, lit};
474    use datatypes::arrow::array::{
475        ArrayRef, BinaryArray, DictionaryArray, Int64Array, StringArray, UInt8Array, UInt32Array,
476        UInt64Array,
477    };
478    use datatypes::arrow::datatypes::{DataType, Field, Schema};
479    use datatypes::data_type::ConcreteDataType;
480    use datatypes::schema::ColumnSchema;
481    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
482    use store_api::storage::RegionId;
483    use table::predicate::Predicate;
484
485    use super::*;
486    use crate::memtable::bulk::context::BulkIterContext;
487    use crate::test_util::sst_util::new_primary_key;
488
489    #[test]
490    fn test_bulk_part_batch_iter() {
491        // Create a simple schema
492        let schema = Arc::new(Schema::new(vec![
493            Field::new("key1", DataType::Utf8, false),
494            Field::new("field1", DataType::Int64, false),
495            Field::new(
496                "timestamp",
497                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None),
498                false,
499            ),
500            Field::new(
501                "__primary_key",
502                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
503                false,
504            ),
505            Field::new("__sequence", DataType::UInt64, false),
506            Field::new("__op_type", DataType::UInt8, false),
507        ]));
508
509        // Create test data
510        let key1 = Arc::new(StringArray::from_iter_values(["key1", "key2", "key3"]));
511        let field1 = Arc::new(Int64Array::from(vec![11, 12, 13]));
512        let timestamp = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
513            vec![1000, 2000, 3000],
514        ));
515
516        // Create primary key dictionary array with properly encoded PKs
517        use datatypes::arrow::array::{BinaryArray, DictionaryArray, UInt32Array};
518        let pk1 = new_primary_key(&["key1"]);
519        let pk2 = new_primary_key(&["key2"]);
520        let pk3 = new_primary_key(&["key3"]);
521        let values = Arc::new(BinaryArray::from_iter_values([
522            pk1.as_slice(),
523            pk2.as_slice(),
524            pk3.as_slice(),
525        ]));
526        let keys = UInt32Array::from(vec![0, 1, 2]);
527        let primary_key = Arc::new(DictionaryArray::new(keys, values));
528
529        let sequence = Arc::new(UInt64Array::from(vec![1, 2, 3]));
530        let op_type = Arc::new(UInt8Array::from(vec![1, 1, 1])); // PUT operations
531
532        let record_batch = RecordBatch::try_new(
533            schema,
534            vec![
535                key1,
536                field1,
537                timestamp,
538                primary_key.clone(),
539                sequence,
540                op_type,
541            ],
542        )
543        .unwrap();
544
545        // Create a minimal region metadata for testing
546        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
547        builder
548            .push_column_metadata(ColumnMetadata {
549                column_schema: ColumnSchema::new(
550                    "key1",
551                    ConcreteDataType::string_datatype(),
552                    false,
553                ),
554                semantic_type: SemanticType::Tag,
555                column_id: 0,
556            })
557            .push_column_metadata(ColumnMetadata {
558                column_schema: ColumnSchema::new(
559                    "field1",
560                    ConcreteDataType::int64_datatype(),
561                    false,
562                ),
563                semantic_type: SemanticType::Field,
564                column_id: 1,
565            })
566            .push_column_metadata(ColumnMetadata {
567                column_schema: ColumnSchema::new(
568                    "timestamp",
569                    ConcreteDataType::timestamp_millisecond_datatype(),
570                    false,
571                ),
572                semantic_type: SemanticType::Timestamp,
573                column_id: 2,
574            })
575            .primary_key(vec![0]);
576
577        let region_metadata = builder.build().unwrap();
578
579        // Create context
580        let context = Arc::new(
581            BulkIterContext::new(
582                Arc::new(region_metadata.clone()),
583                None, // No projection
584                None, // No predicate
585                false,
586            )
587            .unwrap(),
588        );
589        // Iterates all rows.
590        let iter =
591            BulkPartBatchIter::from_single(record_batch.clone(), context.clone(), None, 0, None);
592        let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
593        assert_eq!(1, result.len());
594        assert_eq!(3, result[0].num_rows());
595        assert_eq!(6, result[0].num_columns(),);
596
597        // Creates iter with sequence filter (only include sequences <= 2)
598        let iter = BulkPartBatchIter::from_single(
599            record_batch.clone(),
600            context,
601            Some(SequenceRange::LtEq { max: 2 }),
602            0,
603            None,
604        );
605        let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
606        assert_eq!(1, result.len());
607        let expect_sequence = Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef;
608        assert_eq!(
609            &expect_sequence,
610            result[0].column(result[0].num_columns() - 2)
611        );
612        assert_eq!(6, result[0].num_columns());
613
614        let context = Arc::new(
615            BulkIterContext::new(
616                Arc::new(region_metadata),
617                Some(&[0, 2]),
618                Some(Predicate::new(vec![col("key1").eq(lit("key2"))])),
619                false,
620            )
621            .unwrap(),
622        );
623        // Creates iter with projection and predicate.
624        let iter =
625            BulkPartBatchIter::from_single(record_batch.clone(), context.clone(), None, 0, None);
626        let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
627        assert_eq!(1, result.len());
628        assert_eq!(1, result[0].num_rows());
629        assert_eq!(5, result[0].num_columns());
630        let expect_sequence = Arc::new(UInt64Array::from(vec![2])) as ArrayRef;
631        assert_eq!(
632            &expect_sequence,
633            result[0].column(result[0].num_columns() - 2)
634        );
635    }
636
637    #[test]
638    fn test_bulk_part_batch_iter_multiple_batches() {
639        // Create a simple schema
640        let schema = Arc::new(Schema::new(vec![
641            Field::new("key1", DataType::Utf8, false),
642            Field::new("field1", DataType::Int64, false),
643            Field::new(
644                "timestamp",
645                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None),
646                false,
647            ),
648            Field::new(
649                "__primary_key",
650                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
651                false,
652            ),
653            Field::new("__sequence", DataType::UInt64, false),
654            Field::new("__op_type", DataType::UInt8, false),
655        ]));
656
657        // Create first batch with 2 rows
658        let pk1 = new_primary_key(&["key1"]);
659        let pk2 = new_primary_key(&["key2"]);
660        let key1_1 = Arc::new(StringArray::from_iter_values(["key1", "key2"]));
661        let field1_1 = Arc::new(Int64Array::from(vec![11, 12]));
662        let timestamp_1 = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
663            vec![1000, 2000],
664        ));
665        let values_1 = Arc::new(BinaryArray::from_iter_values([
666            pk1.as_slice(),
667            pk2.as_slice(),
668        ]));
669        let keys_1 = UInt32Array::from(vec![0, 1]);
670        let primary_key_1 = Arc::new(DictionaryArray::new(keys_1, values_1));
671        let sequence_1 = Arc::new(UInt64Array::from(vec![1, 2]));
672        let op_type_1 = Arc::new(UInt8Array::from(vec![1, 1]));
673
674        let batch1 = RecordBatch::try_new(
675            schema.clone(),
676            vec![
677                key1_1,
678                field1_1,
679                timestamp_1,
680                primary_key_1,
681                sequence_1,
682                op_type_1,
683            ],
684        )
685        .unwrap();
686
687        // Create second batch with 3 rows
688        let pk3 = new_primary_key(&["key3"]);
689        let pk4 = new_primary_key(&["key4"]);
690        let pk5 = new_primary_key(&["key5"]);
691        let key1_2 = Arc::new(StringArray::from_iter_values(["key3", "key4", "key5"]));
692        let field1_2 = Arc::new(Int64Array::from(vec![13, 14, 15]));
693        let timestamp_2 = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
694            vec![3000, 4000, 5000],
695        ));
696        let values_2 = Arc::new(BinaryArray::from_iter_values([
697            pk3.as_slice(),
698            pk4.as_slice(),
699            pk5.as_slice(),
700        ]));
701        let keys_2 = UInt32Array::from(vec![0, 1, 2]);
702        let primary_key_2 = Arc::new(DictionaryArray::new(keys_2, values_2));
703        let sequence_2 = Arc::new(UInt64Array::from(vec![3, 4, 5]));
704        let op_type_2 = Arc::new(UInt8Array::from(vec![1, 1, 1]));
705
706        let batch2 = RecordBatch::try_new(
707            schema.clone(),
708            vec![
709                key1_2,
710                field1_2,
711                timestamp_2,
712                primary_key_2,
713                sequence_2,
714                op_type_2,
715            ],
716        )
717        .unwrap();
718
719        // Create region metadata
720        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
721        builder
722            .push_column_metadata(ColumnMetadata {
723                column_schema: ColumnSchema::new(
724                    "key1",
725                    ConcreteDataType::string_datatype(),
726                    false,
727                ),
728                semantic_type: SemanticType::Tag,
729                column_id: 0,
730            })
731            .push_column_metadata(ColumnMetadata {
732                column_schema: ColumnSchema::new(
733                    "field1",
734                    ConcreteDataType::int64_datatype(),
735                    false,
736                ),
737                semantic_type: SemanticType::Field,
738                column_id: 1,
739            })
740            .push_column_metadata(ColumnMetadata {
741                column_schema: ColumnSchema::new(
742                    "timestamp",
743                    ConcreteDataType::timestamp_millisecond_datatype(),
744                    false,
745                ),
746                semantic_type: SemanticType::Timestamp,
747                column_id: 2,
748            })
749            .primary_key(vec![0]);
750
751        let region_metadata = builder.build().unwrap();
752
753        // Create context
754        let context = Arc::new(
755            BulkIterContext::new(
756                Arc::new(region_metadata),
757                None, // No projection
758                None, // No predicate
759                false,
760            )
761            .unwrap(),
762        );
763
764        // Create iterator with multiple batches
765        let expect_batches = vec![batch1, batch2];
766        let iter = BulkPartBatchIter::new(expect_batches.clone(), context.clone(), None, 0, None);
767
768        // Collect all results
769        let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
770        assert_eq!(expect_batches, result);
771    }
772}