mito2/memtable/bulk/
part_reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::time::Instant;
17
18use datatypes::arrow::array::BooleanArray;
19use datatypes::arrow::record_batch::RecordBatch;
20use parquet::arrow::ProjectionMask;
21use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
22use snafu::ResultExt;
23use store_api::storage::SequenceRange;
24
25use crate::error::{self, ComputeArrowSnafu, DecodeArrowRowGroupSnafu};
26use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
27use crate::memtable::bulk::part::EncodedBulkPart;
28use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder;
29use crate::memtable::{MemScanMetrics, MemScanMetricsData};
30use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
31use crate::sst::parquet::file_range::PreFilterMode;
32use crate::sst::parquet::flat_format::sequence_column_index;
33use crate::sst::parquet::reader::RowGroupReaderContext;
34
35/// Iterator for reading data inside a bulk part.
36pub struct EncodedBulkPartIter {
37    context: BulkIterContextRef,
38    row_groups_to_read: VecDeque<usize>,
39    current_reader: Option<ParquetRecordBatchReader>,
40    builder: MemtableRowGroupReaderBuilder,
41    /// Sequence number filter.
42    sequence: Option<SequenceRange>,
43    /// Cached skip_fields for current row group.
44    current_skip_fields: bool,
45    /// Metrics for this iterator.
46    metrics: MemScanMetricsData,
47    /// Optional memory scan metrics to report to.
48    mem_scan_metrics: Option<MemScanMetrics>,
49}
50
51impl EncodedBulkPartIter {
52    /// Creates a new [BulkPartIter].
53    pub(crate) fn try_new(
54        encoded_part: &EncodedBulkPart,
55        context: BulkIterContextRef,
56        mut row_groups_to_read: VecDeque<usize>,
57        sequence: Option<SequenceRange>,
58        mem_scan_metrics: Option<MemScanMetrics>,
59    ) -> error::Result<Self> {
60        assert!(context.read_format().as_flat().is_some());
61
62        let parquet_meta = encoded_part.metadata().parquet_metadata.clone();
63        let data = encoded_part.data().clone();
64        let series_count = encoded_part.metadata().num_series as usize;
65
66        let projection_mask = ProjectionMask::roots(
67            parquet_meta.file_metadata().schema_descr(),
68            context.read_format().projection_indices().iter().copied(),
69        );
70        let builder =
71            MemtableRowGroupReaderBuilder::try_new(&context, projection_mask, parquet_meta, data)?;
72
73        let (init_reader, current_skip_fields) = match row_groups_to_read.pop_front() {
74            Some(first_row_group) => {
75                let skip_fields = builder.compute_skip_fields(&context, first_row_group);
76                let reader = builder.build_row_group_reader(first_row_group, None)?;
77                (Some(reader), skip_fields)
78            }
79            None => (None, false),
80        };
81
82        Ok(Self {
83            context,
84            row_groups_to_read,
85            current_reader: init_reader,
86            builder,
87            sequence,
88            current_skip_fields,
89            metrics: MemScanMetricsData {
90                total_series: series_count,
91                ..Default::default()
92            },
93            mem_scan_metrics,
94        })
95    }
96
97    fn report_mem_scan_metrics(&mut self) {
98        if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
99            mem_scan_metrics.merge_inner(&self.metrics);
100        }
101    }
102
103    /// Fetches next non-empty record batch.
104    pub(crate) fn next_record_batch(&mut self) -> error::Result<Option<RecordBatch>> {
105        let start = Instant::now();
106
107        let Some(current) = &mut self.current_reader else {
108            // All row group exhausted.
109            self.metrics.scan_cost += start.elapsed();
110            return Ok(None);
111        };
112
113        for batch in current {
114            let batch = batch.context(DecodeArrowRowGroupSnafu)?;
115            if let Some(batch) = apply_combined_filters(
116                &self.context,
117                &self.sequence,
118                batch,
119                self.current_skip_fields,
120            )? {
121                // Update metrics
122                self.metrics.num_batches += 1;
123                self.metrics.num_rows += batch.num_rows();
124                self.metrics.scan_cost += start.elapsed();
125                return Ok(Some(batch));
126            }
127        }
128
129        // Previous row group exhausted, read next row group
130        while let Some(next_row_group) = self.row_groups_to_read.pop_front() {
131            // Compute skip_fields for this row group
132            self.current_skip_fields = self
133                .builder
134                .compute_skip_fields(&self.context, next_row_group);
135
136            let next_reader = self.builder.build_row_group_reader(next_row_group, None)?;
137            let current = self.current_reader.insert(next_reader);
138
139            for batch in current {
140                let batch = batch.context(DecodeArrowRowGroupSnafu)?;
141                if let Some(batch) = apply_combined_filters(
142                    &self.context,
143                    &self.sequence,
144                    batch,
145                    self.current_skip_fields,
146                )? {
147                    // Update metrics
148                    self.metrics.num_batches += 1;
149                    self.metrics.num_rows += batch.num_rows();
150                    self.metrics.scan_cost += start.elapsed();
151                    return Ok(Some(batch));
152                }
153            }
154        }
155
156        self.metrics.scan_cost += start.elapsed();
157        Ok(None)
158    }
159}
160
161impl Iterator for EncodedBulkPartIter {
162    type Item = error::Result<RecordBatch>;
163
164    fn next(&mut self) -> Option<Self::Item> {
165        let result = self.next_record_batch().transpose();
166
167        // Report metrics when iteration is complete
168        if result.is_none() {
169            self.report_mem_scan_metrics();
170        }
171
172        result
173    }
174}
175
176impl Drop for EncodedBulkPartIter {
177    fn drop(&mut self) {
178        common_telemetry::debug!(
179            "EncodedBulkPartIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}",
180            self.context.region_id(),
181            self.metrics.total_series,
182            self.metrics.num_rows,
183            self.metrics.num_batches,
184            self.metrics.scan_cost
185        );
186
187        // Report MemScanMetrics if not already reported
188        self.report_mem_scan_metrics();
189
190        READ_ROWS_TOTAL
191            .with_label_values(&["bulk_memtable"])
192            .inc_by(self.metrics.num_rows as u64);
193        READ_STAGE_ELAPSED
194            .with_label_values(&["scan_memtable"])
195            .observe(self.metrics.scan_cost.as_secs_f64());
196    }
197}
198
199/// Iterator for a record batch in a bulk part.
200pub struct BulkPartRecordBatchIter {
201    /// The RecordBatch to read from
202    record_batch: Option<RecordBatch>,
203    /// Iterator context for filtering
204    context: BulkIterContextRef,
205    /// Sequence number filter.
206    sequence: Option<SequenceRange>,
207    /// Metrics for this iterator.
208    metrics: MemScanMetricsData,
209    /// Optional memory scan metrics to report to.
210    mem_scan_metrics: Option<MemScanMetrics>,
211}
212
213impl BulkPartRecordBatchIter {
214    /// Creates a new [BulkPartRecordBatchIter] from a RecordBatch.
215    pub fn new(
216        record_batch: RecordBatch,
217        context: BulkIterContextRef,
218        sequence: Option<SequenceRange>,
219        series_count: usize,
220        mem_scan_metrics: Option<MemScanMetrics>,
221    ) -> Self {
222        assert!(context.read_format().as_flat().is_some());
223
224        Self {
225            record_batch: Some(record_batch),
226            context,
227            sequence,
228            metrics: MemScanMetricsData {
229                total_series: series_count,
230                ..Default::default()
231            },
232            mem_scan_metrics,
233        }
234    }
235
236    fn report_mem_scan_metrics(&mut self) {
237        if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
238            mem_scan_metrics.merge_inner(&self.metrics);
239        }
240    }
241
242    /// Applies projection to the RecordBatch if needed.
243    fn apply_projection(&self, record_batch: RecordBatch) -> error::Result<RecordBatch> {
244        let projection_indices = self.context.read_format().projection_indices();
245        if projection_indices.len() == record_batch.num_columns() {
246            return Ok(record_batch);
247        }
248
249        record_batch
250            .project(projection_indices)
251            .context(ComputeArrowSnafu)
252    }
253
254    fn process_batch(&mut self, record_batch: RecordBatch) -> error::Result<Option<RecordBatch>> {
255        let start = Instant::now();
256
257        // Apply projection first.
258        let projected_batch = self.apply_projection(record_batch)?;
259        // Apply combined filtering (both predicate and sequence filters)
260        // For BulkPartRecordBatchIter, we don't have row group information.
261        let skip_fields = match self.context.pre_filter_mode() {
262            PreFilterMode::All => false,
263            PreFilterMode::SkipFields => true,
264            PreFilterMode::SkipFieldsOnDelete => true,
265        };
266        let Some(filtered_batch) =
267            apply_combined_filters(&self.context, &self.sequence, projected_batch, skip_fields)?
268        else {
269            self.metrics.scan_cost += start.elapsed();
270            return Ok(None);
271        };
272
273        // Update metrics
274        self.metrics.num_batches += 1;
275        self.metrics.num_rows += filtered_batch.num_rows();
276        self.metrics.scan_cost += start.elapsed();
277
278        Ok(Some(filtered_batch))
279    }
280}
281
282impl Iterator for BulkPartRecordBatchIter {
283    type Item = error::Result<RecordBatch>;
284
285    fn next(&mut self) -> Option<Self::Item> {
286        let Some(record_batch) = self.record_batch.take() else {
287            // `take()` should be cheap, we report the metrics directly.
288            self.report_mem_scan_metrics();
289            return None;
290        };
291
292        let result = self.process_batch(record_batch).transpose();
293
294        // Reports metrics when iteration is complete
295        if result.is_none() {
296            self.report_mem_scan_metrics();
297        }
298
299        result
300    }
301}
302
303impl Drop for BulkPartRecordBatchIter {
304    fn drop(&mut self) {
305        common_telemetry::debug!(
306            "BulkPartRecordBatchIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}",
307            self.context.region_id(),
308            self.metrics.total_series,
309            self.metrics.num_rows,
310            self.metrics.num_batches,
311            self.metrics.scan_cost
312        );
313
314        // Report MemScanMetrics if not already reported
315        self.report_mem_scan_metrics();
316
317        READ_ROWS_TOTAL
318            .with_label_values(&["bulk_memtable"])
319            .inc_by(self.metrics.num_rows as u64);
320        READ_STAGE_ELAPSED
321            .with_label_values(&["scan_memtable"])
322            .observe(self.metrics.scan_cost.as_secs_f64());
323    }
324}
325
326/// Applies both predicate filtering and sequence filtering in a single pass.
327/// Returns None if the filtered batch is empty.
328///
329/// # Panics
330/// Panics if the format is not flat.
331fn apply_combined_filters(
332    context: &BulkIterContext,
333    sequence: &Option<SequenceRange>,
334    record_batch: RecordBatch,
335    skip_fields: bool,
336) -> error::Result<Option<RecordBatch>> {
337    // Converts the format to the flat format first.
338    let format = context.read_format().as_flat().unwrap();
339    let record_batch = format.convert_batch(record_batch, None)?;
340
341    let num_rows = record_batch.num_rows();
342    let mut combined_filter = None;
343
344    // First, apply predicate filters using the shared method.
345    if !context.base.filters.is_empty() {
346        let predicate_mask = context
347            .base
348            .compute_filter_mask_flat(&record_batch, skip_fields)?;
349        // If predicate filters out the entire batch, return None early
350        let Some(mask) = predicate_mask else {
351            return Ok(None);
352        };
353        combined_filter = Some(BooleanArray::from(mask));
354    }
355
356    // Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`.
357    if let Some(sequence) = sequence {
358        let sequence_column =
359            record_batch.column(sequence_column_index(record_batch.num_columns()));
360        let sequence_filter = sequence
361            .filter(&sequence_column)
362            .context(ComputeArrowSnafu)?;
363        // Combine with existing filter using AND operation
364        combined_filter = match combined_filter {
365            None => Some(sequence_filter),
366            Some(existing_filter) => {
367                let and_result = datatypes::arrow::compute::and(&existing_filter, &sequence_filter)
368                    .context(ComputeArrowSnafu)?;
369                Some(and_result)
370            }
371        };
372    }
373
374    // Apply the combined filter if any filters were applied
375    let Some(filter_array) = combined_filter else {
376        // No filters applied, return original batch
377        return Ok(Some(record_batch));
378    };
379    let select_count = filter_array.true_count();
380    if select_count == 0 {
381        return Ok(None);
382    }
383    if select_count == num_rows {
384        return Ok(Some(record_batch));
385    }
386    let filtered_batch =
387        datatypes::arrow::compute::filter_record_batch(&record_batch, &filter_array)
388            .context(ComputeArrowSnafu)?;
389
390    Ok(Some(filtered_batch))
391}
392
393#[cfg(test)]
394mod tests {
395    use std::sync::Arc;
396
397    use api::v1::SemanticType;
398    use datafusion_expr::{col, lit};
399    use datatypes::arrow::array::{ArrayRef, Int64Array, StringArray, UInt8Array, UInt64Array};
400    use datatypes::arrow::datatypes::{DataType, Field, Schema};
401    use datatypes::data_type::ConcreteDataType;
402    use datatypes::schema::ColumnSchema;
403    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
404    use store_api::storage::RegionId;
405    use table::predicate::Predicate;
406
407    use super::*;
408    use crate::memtable::bulk::context::BulkIterContext;
409
410    #[test]
411    fn test_bulk_part_record_batch_iter() {
412        // Create a simple schema
413        let schema = Arc::new(Schema::new(vec![
414            Field::new("key1", DataType::Utf8, false),
415            Field::new("field1", DataType::Int64, false),
416            Field::new(
417                "timestamp",
418                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None),
419                false,
420            ),
421            Field::new(
422                "__primary_key",
423                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
424                false,
425            ),
426            Field::new("__sequence", DataType::UInt64, false),
427            Field::new("__op_type", DataType::UInt8, false),
428        ]));
429
430        // Create test data
431        let key1 = Arc::new(StringArray::from_iter_values(["key1", "key2", "key3"]));
432        let field1 = Arc::new(Int64Array::from(vec![11, 12, 13]));
433        let timestamp = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
434            vec![1000, 2000, 3000],
435        ));
436
437        // Create primary key dictionary array
438        use datatypes::arrow::array::{BinaryArray, DictionaryArray, UInt32Array};
439        let values = Arc::new(BinaryArray::from_iter_values([b"key1", b"key2", b"key3"]));
440        let keys = UInt32Array::from(vec![0, 1, 2]);
441        let primary_key = Arc::new(DictionaryArray::new(keys, values));
442
443        let sequence = Arc::new(UInt64Array::from(vec![1, 2, 3]));
444        let op_type = Arc::new(UInt8Array::from(vec![1, 1, 1])); // PUT operations
445
446        let record_batch = RecordBatch::try_new(
447            schema,
448            vec![
449                key1,
450                field1,
451                timestamp,
452                primary_key.clone(),
453                sequence,
454                op_type,
455            ],
456        )
457        .unwrap();
458
459        // Create a minimal region metadata for testing
460        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
461        builder
462            .push_column_metadata(ColumnMetadata {
463                column_schema: ColumnSchema::new(
464                    "key1",
465                    ConcreteDataType::string_datatype(),
466                    false,
467                ),
468                semantic_type: SemanticType::Tag,
469                column_id: 0,
470            })
471            .push_column_metadata(ColumnMetadata {
472                column_schema: ColumnSchema::new(
473                    "field1",
474                    ConcreteDataType::int64_datatype(),
475                    false,
476                ),
477                semantic_type: SemanticType::Field,
478                column_id: 1,
479            })
480            .push_column_metadata(ColumnMetadata {
481                column_schema: ColumnSchema::new(
482                    "timestamp",
483                    ConcreteDataType::timestamp_millisecond_datatype(),
484                    false,
485                ),
486                semantic_type: SemanticType::Timestamp,
487                column_id: 2,
488            })
489            .primary_key(vec![0]);
490
491        let region_metadata = builder.build().unwrap();
492
493        // Create context
494        let context = Arc::new(
495            BulkIterContext::new(
496                Arc::new(region_metadata.clone()),
497                None, // No projection
498                None, // No predicate
499                false,
500            )
501            .unwrap(),
502        );
503        // Iterates all rows.
504        let iter =
505            BulkPartRecordBatchIter::new(record_batch.clone(), context.clone(), None, 0, None);
506        let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
507        assert_eq!(1, result.len());
508        assert_eq!(3, result[0].num_rows());
509        assert_eq!(6, result[0].num_columns(),);
510
511        // Creates iter with sequence filter (only include sequences <= 2)
512        let iter = BulkPartRecordBatchIter::new(
513            record_batch.clone(),
514            context,
515            Some(SequenceRange::LtEq { max: 2 }),
516            0,
517            None,
518        );
519        let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
520        assert_eq!(1, result.len());
521        let expect_sequence = Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef;
522        assert_eq!(
523            &expect_sequence,
524            result[0].column(result[0].num_columns() - 2)
525        );
526        assert_eq!(6, result[0].num_columns());
527
528        let context = Arc::new(
529            BulkIterContext::new(
530                Arc::new(region_metadata),
531                Some(&[0, 2]),
532                Some(Predicate::new(vec![col("key1").eq(lit("key2"))])),
533                false,
534            )
535            .unwrap(),
536        );
537        // Creates iter with projection and predicate.
538        let iter =
539            BulkPartRecordBatchIter::new(record_batch.clone(), context.clone(), None, 0, None);
540        let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
541        assert_eq!(1, result.len());
542        assert_eq!(1, result[0].num_rows());
543        assert_eq!(5, result[0].num_columns());
544        let expect_sequence = Arc::new(UInt64Array::from(vec![2])) as ArrayRef;
545        assert_eq!(
546            &expect_sequence,
547            result[0].column(result[0].num_columns() - 2)
548        );
549    }
550}