mito2/memtable/bulk/
part_reader.rsuse std::collections::VecDeque;
use std::sync::Arc;
use bytes::Bytes;
use parquet::arrow::ProjectionMask;
use parquet::file::metadata::ParquetMetaData;
use store_api::storage::SequenceNumber;
use crate::error;
use crate::memtable::bulk::context::BulkIterContextRef;
use crate::memtable::bulk::row_group_reader::{
MemtableRowGroupReader, MemtableRowGroupReaderBuilder,
};
use crate::read::Batch;
pub struct BulkPartIter {
row_groups_to_read: VecDeque<usize>,
current_reader: Option<PruneReader>,
builder: MemtableRowGroupReaderBuilder,
sequence: Option<SequenceNumber>,
}
impl BulkPartIter {
pub(crate) fn try_new(
context: BulkIterContextRef,
mut row_groups_to_read: VecDeque<usize>,
parquet_meta: Arc<ParquetMetaData>,
data: Bytes,
sequence: Option<SequenceNumber>,
) -> error::Result<Self> {
let projection_mask = ProjectionMask::roots(
parquet_meta.file_metadata().schema_descr(),
context.read_format().projection_indices().iter().copied(),
);
let builder = MemtableRowGroupReaderBuilder::try_new(
context.clone(),
projection_mask,
parquet_meta,
data,
)?;
let init_reader = row_groups_to_read
.pop_front()
.map(|first_row_group| builder.build_row_group_reader(first_row_group, None))
.transpose()?
.map(|r| PruneReader::new(context, r));
Ok(Self {
row_groups_to_read,
current_reader: init_reader,
builder,
sequence,
})
}
pub(crate) fn next_batch(&mut self) -> error::Result<Option<Batch>> {
let Some(current) = &mut self.current_reader else {
return Ok(None);
};
if let Some(mut batch) = current.next_batch()? {
batch.filter_by_sequence(self.sequence)?;
return Ok(Some(batch));
}
while let Some(next_row_group) = self.row_groups_to_read.pop_front() {
current.reset(self.builder.build_row_group_reader(next_row_group, None)?);
if let Some(mut next_batch) = current.next_batch()? {
next_batch.filter_by_sequence(self.sequence)?;
return Ok(Some(next_batch));
}
}
Ok(None)
}
}
impl Iterator for BulkPartIter {
type Item = error::Result<Batch>;
fn next(&mut self) -> Option<Self::Item> {
self.next_batch().transpose()
}
}
struct PruneReader {
context: BulkIterContextRef,
row_group_reader: MemtableRowGroupReader,
}
impl PruneReader {
fn new(context: BulkIterContextRef, reader: MemtableRowGroupReader) -> Self {
Self {
context,
row_group_reader: reader,
}
}
fn next_batch(&mut self) -> error::Result<Option<Batch>> {
while let Some(b) = self.row_group_reader.next_inner()? {
match self.prune(b)? {
Some(b) => {
return Ok(Some(b));
}
None => {
continue;
}
}
}
Ok(None)
}
fn prune(&mut self, batch: Batch) -> error::Result<Option<Batch>> {
if self.context.base.filters.is_empty() {
return Ok(Some(batch));
}
let Some(batch_filtered) = self.context.base.precise_filter(batch)? else {
return Ok(None);
};
if !batch_filtered.is_empty() {
Ok(Some(batch_filtered))
} else {
Ok(None)
}
}
fn reset(&mut self, reader: MemtableRowGroupReader) {
self.row_group_reader = reader;
}
}