mito2/memtable/bulk/
part_reader.rs1use std::collections::VecDeque;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use parquet::arrow::ProjectionMask;
20use parquet::file::metadata::ParquetMetaData;
21use store_api::storage::SequenceNumber;
22
23use crate::error;
24use crate::memtable::bulk::context::BulkIterContextRef;
25use crate::memtable::bulk::row_group_reader::{
26 MemtableRowGroupReader, MemtableRowGroupReaderBuilder,
27};
28use crate::read::Batch;
29
30pub struct BulkPartIter {
32 row_groups_to_read: VecDeque<usize>,
33 current_reader: Option<PruneReader>,
34 builder: MemtableRowGroupReaderBuilder,
35 sequence: Option<SequenceNumber>,
36}
37
38impl BulkPartIter {
39 pub(crate) fn try_new(
41 context: BulkIterContextRef,
42 mut row_groups_to_read: VecDeque<usize>,
43 parquet_meta: Arc<ParquetMetaData>,
44 data: Bytes,
45 sequence: Option<SequenceNumber>,
46 ) -> error::Result<Self> {
47 let projection_mask = ProjectionMask::roots(
48 parquet_meta.file_metadata().schema_descr(),
49 context.read_format().projection_indices().iter().copied(),
50 );
51
52 let builder = MemtableRowGroupReaderBuilder::try_new(
53 context.clone(),
54 projection_mask,
55 parquet_meta,
56 data,
57 )?;
58
59 let init_reader = row_groups_to_read
60 .pop_front()
61 .map(|first_row_group| builder.build_row_group_reader(first_row_group, None))
62 .transpose()?
63 .map(|r| PruneReader::new(context, r));
64 Ok(Self {
65 row_groups_to_read,
66 current_reader: init_reader,
67 builder,
68 sequence,
69 })
70 }
71
72 pub(crate) fn next_batch(&mut self) -> error::Result<Option<Batch>> {
73 let Some(current) = &mut self.current_reader else {
74 return Ok(None);
76 };
77
78 if let Some(mut batch) = current.next_batch()? {
79 batch.filter_by_sequence(self.sequence)?;
80 return Ok(Some(batch));
81 }
82
83 while let Some(next_row_group) = self.row_groups_to_read.pop_front() {
85 current.reset(self.builder.build_row_group_reader(next_row_group, None)?);
86 if let Some(mut next_batch) = current.next_batch()? {
87 next_batch.filter_by_sequence(self.sequence)?;
88 return Ok(Some(next_batch));
89 }
90 }
91 Ok(None)
92 }
93}
94
95impl Iterator for BulkPartIter {
96 type Item = error::Result<Batch>;
97
98 fn next(&mut self) -> Option<Self::Item> {
99 self.next_batch().transpose()
100 }
101}
102
103struct PruneReader {
104 context: BulkIterContextRef,
105 row_group_reader: MemtableRowGroupReader,
106}
107
108impl PruneReader {
110 fn new(context: BulkIterContextRef, reader: MemtableRowGroupReader) -> Self {
111 Self {
112 context,
113 row_group_reader: reader,
114 }
115 }
116
117 fn next_batch(&mut self) -> error::Result<Option<Batch>> {
119 while let Some(b) = self.row_group_reader.next_inner()? {
120 match self.prune(b)? {
121 Some(b) => {
122 return Ok(Some(b));
123 }
124 None => {
125 continue;
126 }
127 }
128 }
129 Ok(None)
130 }
131
132 fn prune(&mut self, batch: Batch) -> error::Result<Option<Batch>> {
134 if self.context.base.filters.is_empty() {
138 return Ok(Some(batch));
139 }
140
141 let Some(batch_filtered) = self.context.base.precise_filter(batch)? else {
142 return Ok(None);
144 };
145 if !batch_filtered.is_empty() {
146 Ok(Some(batch_filtered))
147 } else {
148 Ok(None)
149 }
150 }
151
152 fn reset(&mut self, reader: MemtableRowGroupReader) {
153 self.row_group_reader = reader;
154 }
155}