Skip to main content

mito2/memtable/bulk/
part_reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::time::Instant;
17
18use datatypes::arrow::array::BooleanArray;
19use datatypes::arrow::record_batch::RecordBatch;
20use mito_codec::row_converter::PrimaryKeyFilter;
21use parquet::arrow::ProjectionMask;
22use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
23use snafu::ResultExt;
24use store_api::storage::SequenceRange;
25
26use crate::error::{self, ComputeArrowSnafu, DecodeArrowRowGroupSnafu};
27use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
28use crate::memtable::bulk::part::EncodedBulkPart;
29use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder;
30use crate::memtable::{MemScanMetrics, MemScanMetricsData};
31use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
32use crate::sst::parquet::file_range::TagDecodeState;
33use crate::sst::parquet::flat_format::{primary_key_column_index, sequence_column_index};
34use crate::sst::parquet::prefilter::{CachedPrimaryKeyFilter, prefilter_flat_batch_by_primary_key};
35
36/// Iterator for reading data inside a bulk part.
37pub struct EncodedBulkPartIter {
38    context: BulkIterContextRef,
39    row_groups_to_read: VecDeque<usize>,
40    current_reader: Option<ParquetRecordBatchReader>,
41    builder: MemtableRowGroupReaderBuilder,
42    /// Sequence number filter.
43    sequence: Option<SequenceRange>,
44    /// Cached skip_fields for current row group.
45    current_skip_fields: bool,
46    /// Primary key filter for prefiltering before convert_batch.
47    pk_filter: Option<CachedPrimaryKeyFilter>,
48    /// Metrics for this iterator.
49    metrics: MemScanMetricsData,
50    /// Optional memory scan metrics to report to.
51    mem_scan_metrics: Option<MemScanMetrics>,
52}
53
54impl EncodedBulkPartIter {
55    /// Creates a new [BulkPartIter].
56    pub fn try_new(
57        encoded_part: &EncodedBulkPart,
58        context: BulkIterContextRef,
59        mut row_groups_to_read: VecDeque<usize>,
60        sequence: Option<SequenceRange>,
61        mem_scan_metrics: Option<MemScanMetrics>,
62    ) -> error::Result<Self> {
63        assert!(context.read_format().as_flat().is_some());
64
65        let parquet_meta = encoded_part.metadata().parquet_metadata.clone();
66        let data = encoded_part.data().clone();
67        let series_count = encoded_part.metadata().num_series as usize;
68
69        let projection_mask = ProjectionMask::roots(
70            parquet_meta.file_metadata().schema_descr(),
71            context.read_format().projection_indices().iter().copied(),
72        );
73        let builder =
74            MemtableRowGroupReaderBuilder::try_new(&context, projection_mask, parquet_meta, data)?;
75
76        // Build PK filter if applicable (flat format with dictionary-encoded PKs).
77        let pk_filter = context.build_pk_filter();
78
79        let (init_reader, current_skip_fields) = match row_groups_to_read.pop_front() {
80            Some(first_row_group) => {
81                let skip_fields = context.pre_filter_mode().skip_fields();
82                let reader = builder.build_row_group_reader(first_row_group, None)?;
83                (Some(reader), skip_fields)
84            }
85            None => (None, false),
86        };
87
88        Ok(Self {
89            context,
90            row_groups_to_read,
91            current_reader: init_reader,
92            builder,
93            sequence,
94            current_skip_fields,
95            pk_filter,
96            metrics: MemScanMetricsData {
97                total_series: series_count,
98                ..Default::default()
99            },
100            mem_scan_metrics,
101        })
102    }
103
104    fn report_mem_scan_metrics(&mut self) {
105        if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
106            mem_scan_metrics.merge_inner(&self.metrics);
107        }
108    }
109
110    /// Fetches next non-empty record batch.
111    pub(crate) fn next_record_batch(&mut self) -> error::Result<Option<RecordBatch>> {
112        let start = Instant::now();
113
114        let Some(current) = &mut self.current_reader else {
115            // All row group exhausted.
116            self.metrics.scan_cost += start.elapsed();
117            return Ok(None);
118        };
119
120        for batch in current {
121            let batch = batch.context(DecodeArrowRowGroupSnafu)?;
122            if let Some(batch) = apply_combined_filters(
123                &self.context,
124                &self.sequence,
125                batch,
126                self.current_skip_fields,
127                self.pk_filter
128                    .as_mut()
129                    .map(|f| f as &mut dyn PrimaryKeyFilter),
130                &mut self.metrics,
131            )? {
132                // Update metrics
133                self.metrics.num_batches += 1;
134                self.metrics.num_rows += batch.num_rows();
135                self.metrics.scan_cost += start.elapsed();
136                return Ok(Some(batch));
137            }
138        }
139
140        // Previous row group exhausted, read next row group
141        while let Some(next_row_group) = self.row_groups_to_read.pop_front() {
142            // Compute skip_fields for this row group
143            self.current_skip_fields = self.context.pre_filter_mode().skip_fields();
144
145            let next_reader = self.builder.build_row_group_reader(next_row_group, None)?;
146            let current = self.current_reader.insert(next_reader);
147
148            for batch in current {
149                let batch = batch.context(DecodeArrowRowGroupSnafu)?;
150                if let Some(batch) = apply_combined_filters(
151                    &self.context,
152                    &self.sequence,
153                    batch,
154                    self.current_skip_fields,
155                    self.pk_filter
156                        .as_mut()
157                        .map(|f| f as &mut dyn PrimaryKeyFilter),
158                    &mut self.metrics,
159                )? {
160                    // Update metrics
161                    self.metrics.num_batches += 1;
162                    self.metrics.num_rows += batch.num_rows();
163                    self.metrics.scan_cost += start.elapsed();
164                    return Ok(Some(batch));
165                }
166            }
167        }
168
169        self.metrics.scan_cost += start.elapsed();
170        Ok(None)
171    }
172}
173
174impl Iterator for EncodedBulkPartIter {
175    type Item = error::Result<RecordBatch>;
176
177    fn next(&mut self) -> Option<Self::Item> {
178        let result = self.next_record_batch().transpose();
179
180        // Report metrics when iteration is complete
181        if result.is_none() {
182            self.report_mem_scan_metrics();
183        }
184
185        result
186    }
187}
188
189impl Drop for EncodedBulkPartIter {
190    fn drop(&mut self) {
191        common_telemetry::debug!(
192            "EncodedBulkPartIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}, prefilter_cost={:?}, prefilter_rows_filtered={}",
193            self.context.region_id(),
194            self.metrics.total_series,
195            self.metrics.num_rows,
196            self.metrics.num_batches,
197            self.metrics.scan_cost,
198            self.metrics.prefilter_cost,
199            self.metrics.prefilter_rows_filtered
200        );
201
202        // Report MemScanMetrics if not already reported
203        self.report_mem_scan_metrics();
204
205        READ_ROWS_TOTAL
206            .with_label_values(&["bulk_memtable"])
207            .inc_by(self.metrics.num_rows as u64);
208        READ_STAGE_ELAPSED
209            .with_label_values(&["scan_memtable"])
210            .observe(self.metrics.scan_cost.as_secs_f64());
211    }
212}
213
214/// Iterator for reading record batches from a bulk part.
215///
216/// Iterates through one or more RecordBatches, applying filters and projections.
217pub struct BulkPartBatchIter {
218    /// Queue of RecordBatches to process.
219    batches: VecDeque<RecordBatch>,
220    /// Iterator context for filtering and projection.
221    context: BulkIterContextRef,
222    /// Sequence number filter.
223    sequence: Option<SequenceRange>,
224    /// Primary key filter for prefiltering before convert_batch.
225    pk_filter: Option<CachedPrimaryKeyFilter>,
226    /// Metrics for this iterator.
227    metrics: MemScanMetricsData,
228    /// Optional memory scan metrics to report to.
229    mem_scan_metrics: Option<MemScanMetrics>,
230}
231
232impl BulkPartBatchIter {
233    /// Creates a new [BulkPartBatchIter] from multiple RecordBatches.
234    pub fn new(
235        batches: Vec<RecordBatch>,
236        context: BulkIterContextRef,
237        sequence: Option<SequenceRange>,
238        series_count: usize,
239        mem_scan_metrics: Option<MemScanMetrics>,
240    ) -> Self {
241        assert!(context.read_format().as_flat().is_some());
242
243        let pk_filter = context.build_pk_filter();
244
245        Self {
246            batches: VecDeque::from(batches),
247            context,
248            sequence,
249            pk_filter,
250            metrics: MemScanMetricsData {
251                total_series: series_count,
252                ..Default::default()
253            },
254            mem_scan_metrics,
255        }
256    }
257
258    /// Creates a new [BulkPartBatchIter] from a single RecordBatch.
259    pub fn from_single(
260        record_batch: RecordBatch,
261        context: BulkIterContextRef,
262        sequence: Option<SequenceRange>,
263        series_count: usize,
264        mem_scan_metrics: Option<MemScanMetrics>,
265    ) -> Self {
266        Self::new(
267            vec![record_batch],
268            context,
269            sequence,
270            series_count,
271            mem_scan_metrics,
272        )
273    }
274
275    fn report_mem_scan_metrics(&mut self) {
276        if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
277            mem_scan_metrics.merge_inner(&self.metrics);
278        }
279    }
280
281    /// Applies projection to the RecordBatch if needed.
282    fn apply_projection(&self, record_batch: RecordBatch) -> error::Result<RecordBatch> {
283        let projection_indices = self.context.read_format().projection_indices();
284        if projection_indices.len() == record_batch.num_columns() {
285            return Ok(record_batch);
286        }
287
288        record_batch
289            .project(projection_indices)
290            .context(ComputeArrowSnafu)
291    }
292
293    fn process_batch(&mut self, record_batch: RecordBatch) -> error::Result<Option<RecordBatch>> {
294        let start = Instant::now();
295
296        // Apply projection first.
297        let projected_batch = self.apply_projection(record_batch)?;
298
299        // Apply combined filtering (both predicate and sequence filters)
300        let skip_fields = self.context.pre_filter_mode().skip_fields();
301
302        let Some(filtered_batch) = apply_combined_filters(
303            &self.context,
304            &self.sequence,
305            projected_batch,
306            skip_fields,
307            self.pk_filter
308                .as_mut()
309                .map(|f| f as &mut dyn PrimaryKeyFilter),
310            &mut self.metrics,
311        )?
312        else {
313            self.metrics.scan_cost += start.elapsed();
314            return Ok(None);
315        };
316
317        // Update metrics
318        self.metrics.num_batches += 1;
319        self.metrics.num_rows += filtered_batch.num_rows();
320        self.metrics.scan_cost += start.elapsed();
321
322        Ok(Some(filtered_batch))
323    }
324}
325
326impl Iterator for BulkPartBatchIter {
327    type Item = error::Result<RecordBatch>;
328
329    fn next(&mut self) -> Option<Self::Item> {
330        // Process batches until we find a non-empty one or run out
331        while let Some(batch) = self.batches.pop_front() {
332            match self.process_batch(batch) {
333                Ok(Some(result)) => return Some(Ok(result)),
334                Ok(None) => continue, // This batch was filtered out, try next
335                Err(e) => {
336                    self.report_mem_scan_metrics();
337                    return Some(Err(e));
338                }
339            }
340        }
341
342        // No more batches
343        self.report_mem_scan_metrics();
344        None
345    }
346}
347
348impl Drop for BulkPartBatchIter {
349    fn drop(&mut self) {
350        common_telemetry::debug!(
351            "BulkPartBatchIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}, prefilter_cost={:?}, prefilter_rows_filtered={}",
352            self.context.region_id(),
353            self.metrics.total_series,
354            self.metrics.num_rows,
355            self.metrics.num_batches,
356            self.metrics.scan_cost,
357            self.metrics.prefilter_cost,
358            self.metrics.prefilter_rows_filtered
359        );
360
361        // Report MemScanMetrics if not already reported
362        self.report_mem_scan_metrics();
363
364        READ_ROWS_TOTAL
365            .with_label_values(&["bulk_memtable"])
366            .inc_by(self.metrics.num_rows as u64);
367        READ_STAGE_ELAPSED
368            .with_label_values(&["scan_memtable"])
369            .observe(self.metrics.scan_cost.as_secs_f64());
370    }
371}
372
373/// Applies both predicate filtering and sequence filtering in a single pass.
374/// Returns None if the filtered batch is empty.
375///
376/// # Panics
377/// Panics if the format is not flat.
378fn apply_combined_filters(
379    context: &BulkIterContext,
380    sequence: &Option<SequenceRange>,
381    record_batch: RecordBatch,
382    skip_fields: bool,
383    pk_filter: Option<&mut dyn PrimaryKeyFilter>,
384    metrics: &mut MemScanMetricsData,
385) -> error::Result<Option<RecordBatch>> {
386    // Apply PK prefilter on raw batch before convert_batch to reduce conversion overhead.
387    let has_pk_prefilter = pk_filter.is_some();
388    let record_batch = if let Some(pk_filter) = pk_filter {
389        let rows_before = record_batch.num_rows();
390        let prefilter_start = Instant::now();
391        let pk_col_idx = primary_key_column_index(record_batch.num_columns());
392        match prefilter_flat_batch_by_primary_key(record_batch, pk_col_idx, pk_filter)? {
393            Some(batch) => {
394                metrics.prefilter_cost += prefilter_start.elapsed();
395                metrics.prefilter_rows_filtered += rows_before - batch.num_rows();
396                batch
397            }
398            None => {
399                metrics.prefilter_cost += prefilter_start.elapsed();
400                metrics.prefilter_rows_filtered += rows_before;
401                return Ok(None);
402            }
403        }
404    } else {
405        record_batch
406    };
407
408    // Converts the format to the flat format.
409    let format = context.read_format().as_flat().unwrap();
410    let record_batch = format.convert_batch(record_batch, None)?;
411
412    let num_rows = record_batch.num_rows();
413    let mut combined_filter = None;
414    let mut tag_decode_state = TagDecodeState::new();
415
416    // Apply predicate filters using the shared method.
417    if !context.base.filters.is_empty() {
418        let predicate_mask = context.base.compute_filter_mask_flat(
419            &record_batch,
420            skip_fields,
421            has_pk_prefilter,
422            &mut tag_decode_state,
423        )?;
424        // If predicate filters out the entire batch, return None early
425        let Some(mask) = predicate_mask else {
426            return Ok(None);
427        };
428        combined_filter = Some(BooleanArray::from(mask));
429    }
430
431    // Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`.
432    if let Some(sequence) = sequence {
433        let sequence_column =
434            record_batch.column(sequence_column_index(record_batch.num_columns()));
435        let sequence_filter = sequence
436            .filter(&sequence_column)
437            .context(ComputeArrowSnafu)?;
438        // Combine with existing filter using AND operation
439        combined_filter = match combined_filter {
440            None => Some(sequence_filter),
441            Some(existing_filter) => {
442                let and_result = datatypes::arrow::compute::and(&existing_filter, &sequence_filter)
443                    .context(ComputeArrowSnafu)?;
444                Some(and_result)
445            }
446        };
447    }
448
449    // Apply the combined filter if any filters were applied
450    let Some(filter_array) = combined_filter else {
451        // No filters applied, return original batch
452        return Ok(Some(record_batch));
453    };
454    let select_count = filter_array.true_count();
455    if select_count == 0 {
456        return Ok(None);
457    }
458    if select_count == num_rows {
459        return Ok(Some(record_batch));
460    }
461    let filtered_batch =
462        datatypes::arrow::compute::filter_record_batch(&record_batch, &filter_array)
463            .context(ComputeArrowSnafu)?;
464
465    Ok(Some(filtered_batch))
466}
467
468#[cfg(test)]
469mod tests {
470    use std::sync::Arc;
471
472    use api::v1::SemanticType;
473    use datafusion_expr::{col, lit};
474    use datatypes::arrow::array::{
475        ArrayRef, BinaryArray, DictionaryArray, Int64Array, StringArray, UInt8Array, UInt32Array,
476        UInt64Array,
477    };
478    use datatypes::arrow::datatypes::{DataType, Field, Schema};
479    use datatypes::data_type::ConcreteDataType;
480    use datatypes::schema::ColumnSchema;
481    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
482    use store_api::storage::RegionId;
483    use table::predicate::Predicate;
484
485    use super::*;
486    use crate::memtable::bulk::context::BulkIterContext;
487    use crate::test_util::sst_util::new_primary_key;
488
489    #[test]
490    fn test_bulk_part_batch_iter() {
491        // Create a simple schema
492        let schema = Arc::new(Schema::new(vec![
493            Field::new("key1", DataType::Utf8, false),
494            Field::new("field1", DataType::Int64, false),
495            Field::new(
496                "timestamp",
497                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None),
498                false,
499            ),
500            Field::new(
501                "__primary_key",
502                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
503                false,
504            ),
505            Field::new("__sequence", DataType::UInt64, false),
506            Field::new("__op_type", DataType::UInt8, false),
507        ]));
508
509        // Create test data
510        let key1 = Arc::new(StringArray::from_iter_values(["key1", "key2", "key3"]));
511        let field1 = Arc::new(Int64Array::from(vec![11, 12, 13]));
512        let timestamp = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
513            vec![1000, 2000, 3000],
514        ));
515
516        // Create primary key dictionary array with properly encoded PKs
517        use datatypes::arrow::array::{BinaryArray, DictionaryArray, UInt32Array};
518        let pk1 = new_primary_key(&["key1"]);
519        let pk2 = new_primary_key(&["key2"]);
520        let pk3 = new_primary_key(&["key3"]);
521        let values = Arc::new(BinaryArray::from_iter_values([
522            pk1.as_slice(),
523            pk2.as_slice(),
524            pk3.as_slice(),
525        ]));
526        let keys = UInt32Array::from(vec![0, 1, 2]);
527        let primary_key = Arc::new(DictionaryArray::new(keys, values));
528
529        let sequence = Arc::new(UInt64Array::from(vec![1, 2, 3]));
530        let op_type = Arc::new(UInt8Array::from(vec![1, 1, 1])); // PUT operations
531
532        let record_batch = RecordBatch::try_new(
533            schema,
534            vec![
535                key1,
536                field1,
537                timestamp,
538                primary_key.clone(),
539                sequence,
540                op_type,
541            ],
542        )
543        .unwrap();
544
545        // Create a minimal region metadata for testing
546        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
547        builder
548            .push_column_metadata(ColumnMetadata {
549                column_schema: ColumnSchema::new(
550                    "key1",
551                    ConcreteDataType::string_datatype(),
552                    false,
553                ),
554                semantic_type: SemanticType::Tag,
555                column_id: 0,
556            })
557            .push_column_metadata(ColumnMetadata {
558                column_schema: ColumnSchema::new(
559                    "field1",
560                    ConcreteDataType::int64_datatype(),
561                    false,
562                ),
563                semantic_type: SemanticType::Field,
564                column_id: 1,
565            })
566            .push_column_metadata(ColumnMetadata {
567                column_schema: ColumnSchema::new(
568                    "timestamp",
569                    ConcreteDataType::timestamp_millisecond_datatype(),
570                    false,
571                ),
572                semantic_type: SemanticType::Timestamp,
573                column_id: 2,
574            })
575            .primary_key(vec![0]);
576
577        let region_metadata = builder.build().unwrap();
578
579        // Create context
580        let context = Arc::new(
581            BulkIterContext::new(
582                Arc::new(region_metadata.clone()),
583                None, // No projection
584                None, // No predicate
585                false,
586            )
587            .unwrap(),
588        );
589        // Iterates all rows.
590        let iter =
591            BulkPartBatchIter::from_single(record_batch.clone(), context.clone(), None, 0, None);
592        let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
593        assert_eq!(1, result.len());
594        assert_eq!(3, result[0].num_rows());
595        assert_eq!(6, result[0].num_columns(),);
596
597        // Creates iter with sequence filter (only include sequences <= 2)
598        let iter = BulkPartBatchIter::from_single(
599            record_batch.clone(),
600            context,
601            Some(SequenceRange::LtEq { max: 2 }),
602            0,
603            None,
604        );
605        let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
606        assert_eq!(1, result.len());
607        let expect_sequence = Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef;
608        assert_eq!(
609            &expect_sequence,
610            result[0].column(result[0].num_columns() - 2)
611        );
612        assert_eq!(6, result[0].num_columns());
613
614        let context = Arc::new(
615            BulkIterContext::new(
616                Arc::new(region_metadata),
617                Some(&[0, 2]),
618                Some(Predicate::new(vec![col("key1").eq(lit("key2"))])),
619                false,
620            )
621            .unwrap(),
622        );
623        // Creates iter with projection and predicate.
624        let iter =
625            BulkPartBatchIter::from_single(record_batch.clone(), context.clone(), None, 0, None);
626        let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
627        assert_eq!(1, result.len());
628        assert_eq!(1, result[0].num_rows());
629        assert_eq!(5, result[0].num_columns());
630        let expect_sequence = Arc::new(UInt64Array::from(vec![2])) as ArrayRef;
631        assert_eq!(
632            &expect_sequence,
633            result[0].column(result[0].num_columns() - 2)
634        );
635    }
636
637    #[test]
638    fn test_bulk_part_batch_iter_multiple_batches() {
639        // Create a simple schema
640        let schema = Arc::new(Schema::new(vec![
641            Field::new("key1", DataType::Utf8, false),
642            Field::new("field1", DataType::Int64, false),
643            Field::new(
644                "timestamp",
645                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None),
646                false,
647            ),
648            Field::new(
649                "__primary_key",
650                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
651                false,
652            ),
653            Field::new("__sequence", DataType::UInt64, false),
654            Field::new("__op_type", DataType::UInt8, false),
655        ]));
656
657        // Create first batch with 2 rows
658        let pk1 = new_primary_key(&["key1"]);
659        let pk2 = new_primary_key(&["key2"]);
660        let key1_1 = Arc::new(StringArray::from_iter_values(["key1", "key2"]));
661        let field1_1 = Arc::new(Int64Array::from(vec![11, 12]));
662        let timestamp_1 = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
663            vec![1000, 2000],
664        ));
665        let values_1 = Arc::new(BinaryArray::from_iter_values([
666            pk1.as_slice(),
667            pk2.as_slice(),
668        ]));
669        let keys_1 = UInt32Array::from(vec![0, 1]);
670        let primary_key_1 = Arc::new(DictionaryArray::new(keys_1, values_1));
671        let sequence_1 = Arc::new(UInt64Array::from(vec![1, 2]));
672        let op_type_1 = Arc::new(UInt8Array::from(vec![1, 1]));
673
674        let batch1 = RecordBatch::try_new(
675            schema.clone(),
676            vec![
677                key1_1,
678                field1_1,
679                timestamp_1,
680                primary_key_1,
681                sequence_1,
682                op_type_1,
683            ],
684        )
685        .unwrap();
686
687        // Create second batch with 3 rows
688        let pk3 = new_primary_key(&["key3"]);
689        let pk4 = new_primary_key(&["key4"]);
690        let pk5 = new_primary_key(&["key5"]);
691        let key1_2 = Arc::new(StringArray::from_iter_values(["key3", "key4", "key5"]));
692        let field1_2 = Arc::new(Int64Array::from(vec![13, 14, 15]));
693        let timestamp_2 = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
694            vec![3000, 4000, 5000],
695        ));
696        let values_2 = Arc::new(BinaryArray::from_iter_values([
697            pk3.as_slice(),
698            pk4.as_slice(),
699            pk5.as_slice(),
700        ]));
701        let keys_2 = UInt32Array::from(vec![0, 1, 2]);
702        let primary_key_2 = Arc::new(DictionaryArray::new(keys_2, values_2));
703        let sequence_2 = Arc::new(UInt64Array::from(vec![3, 4, 5]));
704        let op_type_2 = Arc::new(UInt8Array::from(vec![1, 1, 1]));
705
706        let batch2 = RecordBatch::try_new(
707            schema.clone(),
708            vec![
709                key1_2,
710                field1_2,
711                timestamp_2,
712                primary_key_2,
713                sequence_2,
714                op_type_2,
715            ],
716        )
717        .unwrap();
718
719        // Create region metadata
720        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
721        builder
722            .push_column_metadata(ColumnMetadata {
723                column_schema: ColumnSchema::new(
724                    "key1",
725                    ConcreteDataType::string_datatype(),
726                    false,
727                ),
728                semantic_type: SemanticType::Tag,
729                column_id: 0,
730            })
731            .push_column_metadata(ColumnMetadata {
732                column_schema: ColumnSchema::new(
733                    "field1",
734                    ConcreteDataType::int64_datatype(),
735                    false,
736                ),
737                semantic_type: SemanticType::Field,
738                column_id: 1,
739            })
740            .push_column_metadata(ColumnMetadata {
741                column_schema: ColumnSchema::new(
742                    "timestamp",
743                    ConcreteDataType::timestamp_millisecond_datatype(),
744                    false,
745                ),
746                semantic_type: SemanticType::Timestamp,
747                column_id: 2,
748            })
749            .primary_key(vec![0]);
750
751        let region_metadata = builder.build().unwrap();
752
753        // Create context
754        let context = Arc::new(
755            BulkIterContext::new(
756                Arc::new(region_metadata),
757                None, // No projection
758                None, // No predicate
759                false,
760            )
761            .unwrap(),
762        );
763
764        // Create iterator with multiple batches
765        let expect_batches = vec![batch1, batch2];
766        let iter = BulkPartBatchIter::new(expect_batches.clone(), context.clone(), None, 0, None);
767
768        // Collect all results
769        let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
770        assert_eq!(expect_batches, result);
771    }
772}