mito2/memtable/bulk/
part_reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::ops::BitAnd;
17use std::sync::Arc;
18
19use bytes::Bytes;
20use datatypes::arrow::array::{BooleanArray, Scalar, UInt64Array};
21use datatypes::arrow::buffer::BooleanBuffer;
22use datatypes::arrow::record_batch::RecordBatch;
23use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
24use parquet::arrow::ProjectionMask;
25use parquet::file::metadata::ParquetMetaData;
26use snafu::ResultExt;
27use store_api::storage::SequenceNumber;
28
29use crate::error::{self, ComputeArrowSnafu, DecodeArrowRowGroupSnafu};
30use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
31use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder;
32use crate::sst::parquet::flat_format::sequence_column_index;
33use crate::sst::parquet::reader::{MaybeFilter, RowGroupReaderContext};
34
35/// Iterator for reading data inside a bulk part.
36pub struct EncodedBulkPartIter {
37    context: BulkIterContextRef,
38    row_groups_to_read: VecDeque<usize>,
39    current_reader: Option<ParquetRecordBatchReader>,
40    builder: MemtableRowGroupReaderBuilder,
41    /// Sequence number filter.
42    sequence: Option<Scalar<UInt64Array>>,
43}
44
45impl EncodedBulkPartIter {
46    /// Creates a new [BulkPartIter].
47    pub(crate) fn try_new(
48        context: BulkIterContextRef,
49        mut row_groups_to_read: VecDeque<usize>,
50        parquet_meta: Arc<ParquetMetaData>,
51        data: Bytes,
52        sequence: Option<SequenceNumber>,
53    ) -> error::Result<Self> {
54        assert!(context.read_format().as_flat().is_some());
55
56        let sequence = sequence.map(UInt64Array::new_scalar);
57
58        let projection_mask = ProjectionMask::roots(
59            parquet_meta.file_metadata().schema_descr(),
60            context.read_format().projection_indices().iter().copied(),
61        );
62        let builder =
63            MemtableRowGroupReaderBuilder::try_new(&context, projection_mask, parquet_meta, data)?;
64
65        let init_reader = row_groups_to_read
66            .pop_front()
67            .map(|first_row_group| builder.build_row_group_reader(first_row_group, None))
68            .transpose()?;
69        Ok(Self {
70            context,
71            row_groups_to_read,
72            current_reader: init_reader,
73            builder,
74            sequence,
75        })
76    }
77
78    /// Fetches next non-empty record batch.
79    pub(crate) fn next_record_batch(&mut self) -> error::Result<Option<RecordBatch>> {
80        let Some(current) = &mut self.current_reader else {
81            // All row group exhausted.
82            return Ok(None);
83        };
84
85        for batch in current {
86            let batch = batch.context(DecodeArrowRowGroupSnafu)?;
87            if let Some(batch) = apply_combined_filters(&self.context, &self.sequence, batch)? {
88                return Ok(Some(batch));
89            }
90        }
91
92        // Previous row group exhausted, read next row group
93        while let Some(next_row_group) = self.row_groups_to_read.pop_front() {
94            let next_reader = self.builder.build_row_group_reader(next_row_group, None)?;
95            let current = self.current_reader.insert(next_reader);
96
97            for batch in current {
98                let batch = batch.context(DecodeArrowRowGroupSnafu)?;
99                if let Some(batch) = apply_combined_filters(&self.context, &self.sequence, batch)? {
100                    return Ok(Some(batch));
101                }
102            }
103        }
104
105        Ok(None)
106    }
107}
108
109impl Iterator for EncodedBulkPartIter {
110    type Item = error::Result<RecordBatch>;
111
112    fn next(&mut self) -> Option<Self::Item> {
113        self.next_record_batch().transpose()
114    }
115}
116
117/// Iterator for a record batch in a bulk part.
118pub struct BulkPartRecordBatchIter {
119    /// The RecordBatch to read from
120    record_batch: Option<RecordBatch>,
121    /// Iterator context for filtering
122    context: BulkIterContextRef,
123    /// Sequence number filter.
124    sequence: Option<Scalar<UInt64Array>>,
125}
126
127impl BulkPartRecordBatchIter {
128    /// Creates a new [BulkPartRecordBatchIter] from a RecordBatch.
129    pub fn new(
130        record_batch: RecordBatch,
131        context: BulkIterContextRef,
132        sequence: Option<SequenceNumber>,
133    ) -> Self {
134        assert!(context.read_format().as_flat().is_some());
135
136        let sequence = sequence.map(UInt64Array::new_scalar);
137
138        Self {
139            record_batch: Some(record_batch),
140            context,
141            sequence,
142        }
143    }
144
145    /// Applies projection to the RecordBatch if needed.
146    fn apply_projection(&self, record_batch: RecordBatch) -> error::Result<RecordBatch> {
147        let projection_indices = self.context.read_format().projection_indices();
148        if projection_indices.len() == record_batch.num_columns() {
149            return Ok(record_batch);
150        }
151
152        record_batch
153            .project(projection_indices)
154            .context(ComputeArrowSnafu)
155    }
156
157    fn process_batch(&mut self, record_batch: RecordBatch) -> error::Result<Option<RecordBatch>> {
158        // Apply projection first.
159        let projected_batch = self.apply_projection(record_batch)?;
160        // Apply combined filtering (both predicate and sequence filters)
161        let Some(filtered_batch) =
162            apply_combined_filters(&self.context, &self.sequence, projected_batch)?
163        else {
164            return Ok(None);
165        };
166
167        Ok(Some(filtered_batch))
168    }
169}
170
171impl Iterator for BulkPartRecordBatchIter {
172    type Item = error::Result<RecordBatch>;
173
174    fn next(&mut self) -> Option<Self::Item> {
175        let record_batch = self.record_batch.take()?;
176
177        self.process_batch(record_batch).transpose()
178    }
179}
180
181// TODO(yingwen): Supports sparse encoding which doesn't have decoded primary key columns.
182/// Applies both predicate filtering and sequence filtering in a single pass.
183/// Returns None if the filtered batch is empty.
184fn apply_combined_filters(
185    context: &BulkIterContext,
186    sequence: &Option<Scalar<UInt64Array>>,
187    record_batch: RecordBatch,
188) -> error::Result<Option<RecordBatch>> {
189    let num_rows = record_batch.num_rows();
190    let mut combined_filter = None;
191
192    // First, apply predicate filters.
193    if !context.base.filters.is_empty() {
194        let num_rows = record_batch.num_rows();
195        let mut mask = BooleanBuffer::new_set(num_rows);
196
197        // Run filter one by one and combine them result, similar to RangeBase::precise_filter
198        for filter_ctx in &context.base.filters {
199            let filter = match filter_ctx.filter() {
200                MaybeFilter::Filter(f) => f,
201                // Column matches.
202                MaybeFilter::Matched => continue,
203                // Column doesn't match, filter the entire batch.
204                MaybeFilter::Pruned => return Ok(None),
205            };
206
207            // Safety: We checked the format type in new().
208            let Some(column_index) = context
209                .read_format()
210                .as_flat()
211                .unwrap()
212                .projected_index_by_id(filter_ctx.column_id())
213            else {
214                continue;
215            };
216            let array = record_batch.column(column_index);
217            let result = filter
218                .evaluate_array(array)
219                .context(crate::error::RecordBatchSnafu)?;
220
221            mask = mask.bitand(&result);
222        }
223        // Convert the mask to BooleanArray
224        combined_filter = Some(BooleanArray::from(mask));
225    }
226
227    // Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`.
228    if let Some(sequence) = sequence {
229        let sequence_column =
230            record_batch.column(sequence_column_index(record_batch.num_columns()));
231        let sequence_filter =
232            datatypes::arrow::compute::kernels::cmp::lt_eq(sequence_column, sequence)
233                .context(ComputeArrowSnafu)?;
234        // Combine with existing filter using AND operation
235        combined_filter = match combined_filter {
236            None => Some(sequence_filter),
237            Some(existing_filter) => {
238                let and_result = datatypes::arrow::compute::and(&existing_filter, &sequence_filter)
239                    .context(ComputeArrowSnafu)?;
240                Some(and_result)
241            }
242        };
243    }
244
245    // Apply the combined filter if any filters were applied
246    let Some(filter_array) = combined_filter else {
247        // No filters applied, return original batch
248        return Ok(Some(record_batch));
249    };
250    let select_count = filter_array.true_count();
251    if select_count == 0 {
252        return Ok(None);
253    }
254    if select_count == num_rows {
255        return Ok(Some(record_batch));
256    }
257    let filtered_batch =
258        datatypes::arrow::compute::filter_record_batch(&record_batch, &filter_array)
259            .context(ComputeArrowSnafu)?;
260
261    Ok(Some(filtered_batch))
262}
263
264#[cfg(test)]
265mod tests {
266    use std::sync::Arc;
267
268    use api::v1::SemanticType;
269    use datafusion_expr::{col, lit};
270    use datatypes::arrow::array::{ArrayRef, Int64Array, StringArray, UInt64Array, UInt8Array};
271    use datatypes::arrow::datatypes::{DataType, Field, Schema};
272    use datatypes::data_type::ConcreteDataType;
273    use datatypes::schema::ColumnSchema;
274    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
275    use store_api::storage::RegionId;
276    use table::predicate::Predicate;
277
278    use super::*;
279    use crate::memtable::bulk::context::BulkIterContext;
280
281    #[test]
282    fn test_bulk_part_record_batch_iter() {
283        // Create a simple schema
284        let schema = Arc::new(Schema::new(vec![
285            Field::new("key1", DataType::Utf8, false),
286            Field::new("field1", DataType::Int64, false),
287            Field::new(
288                "timestamp",
289                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None),
290                false,
291            ),
292            Field::new(
293                "__primary_key",
294                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
295                false,
296            ),
297            Field::new("__sequence", DataType::UInt64, false),
298            Field::new("__op_type", DataType::UInt8, false),
299        ]));
300
301        // Create test data
302        let key1 = Arc::new(StringArray::from_iter_values(["key1", "key2", "key3"]));
303        let field1 = Arc::new(Int64Array::from(vec![11, 12, 13]));
304        let timestamp = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
305            vec![1000, 2000, 3000],
306        ));
307
308        // Create primary key dictionary array
309        use datatypes::arrow::array::{BinaryArray, DictionaryArray, UInt32Array};
310        let values = Arc::new(BinaryArray::from_iter_values([b"key1", b"key2", b"key3"]));
311        let keys = UInt32Array::from(vec![0, 1, 2]);
312        let primary_key = Arc::new(DictionaryArray::new(keys, values));
313
314        let sequence = Arc::new(UInt64Array::from(vec![1, 2, 3]));
315        let op_type = Arc::new(UInt8Array::from(vec![1, 1, 1])); // PUT operations
316
317        let record_batch = RecordBatch::try_new(
318            schema,
319            vec![
320                key1,
321                field1,
322                timestamp,
323                primary_key.clone(),
324                sequence,
325                op_type,
326            ],
327        )
328        .unwrap();
329
330        // Create a minimal region metadata for testing
331        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
332        builder
333            .push_column_metadata(ColumnMetadata {
334                column_schema: ColumnSchema::new(
335                    "key1",
336                    ConcreteDataType::string_datatype(),
337                    false,
338                ),
339                semantic_type: SemanticType::Tag,
340                column_id: 0,
341            })
342            .push_column_metadata(ColumnMetadata {
343                column_schema: ColumnSchema::new(
344                    "field1",
345                    ConcreteDataType::int64_datatype(),
346                    false,
347                ),
348                semantic_type: SemanticType::Field,
349                column_id: 1,
350            })
351            .push_column_metadata(ColumnMetadata {
352                column_schema: ColumnSchema::new(
353                    "timestamp",
354                    ConcreteDataType::timestamp_millisecond_datatype(),
355                    false,
356                ),
357                semantic_type: SemanticType::Timestamp,
358                column_id: 2,
359            })
360            .primary_key(vec![0]);
361
362        let region_metadata = builder.build().unwrap();
363
364        // Create context
365        let context = Arc::new(BulkIterContext::new(
366            Arc::new(region_metadata.clone()),
367            &None, // No projection
368            None,  // No predicate
369        ));
370        // Iterates all rows.
371        let iter = BulkPartRecordBatchIter::new(record_batch.clone(), context.clone(), None);
372        let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
373        assert_eq!(1, result.len());
374        assert_eq!(3, result[0].num_rows());
375        assert_eq!(6, result[0].num_columns(),);
376
377        // Creates iter with sequence filter (only include sequences <= 2)
378        let iter = BulkPartRecordBatchIter::new(record_batch.clone(), context, Some(2));
379        let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
380        assert_eq!(1, result.len());
381        let expect_sequence = Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef;
382        assert_eq!(
383            &expect_sequence,
384            result[0].column(result[0].num_columns() - 2)
385        );
386        assert_eq!(6, result[0].num_columns());
387
388        let context = Arc::new(BulkIterContext::new(
389            Arc::new(region_metadata),
390            &Some(&[0, 2]),
391            Some(Predicate::new(vec![col("key1").eq(lit("key2"))])),
392        ));
393        // Creates iter with projection and predicate.
394        let iter = BulkPartRecordBatchIter::new(record_batch.clone(), context.clone(), None);
395        let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
396        assert_eq!(1, result.len());
397        assert_eq!(1, result[0].num_rows());
398        assert_eq!(5, result[0].num_columns());
399        let expect_sequence = Arc::new(UInt64Array::from(vec![2])) as ArrayRef;
400        assert_eq!(
401            &expect_sequence,
402            result[0].column(result[0].num_columns() - 2)
403        );
404    }
405}