mito2/memtable/bulk/
part_reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use parquet::arrow::ProjectionMask;
20use parquet::file::metadata::ParquetMetaData;
21use store_api::storage::SequenceNumber;
22
23use crate::error;
24use crate::memtable::bulk::context::BulkIterContextRef;
25use crate::memtable::bulk::row_group_reader::{
26    MemtableRowGroupReader, MemtableRowGroupReaderBuilder,
27};
28use crate::read::Batch;
29
30/// Iterator for reading data inside a bulk part.
31pub struct BulkPartIter {
32    row_groups_to_read: VecDeque<usize>,
33    current_reader: Option<PruneReader>,
34    builder: MemtableRowGroupReaderBuilder,
35    sequence: Option<SequenceNumber>,
36}
37
38impl BulkPartIter {
39    /// Creates a new [BulkPartIter].
40    pub(crate) fn try_new(
41        context: BulkIterContextRef,
42        mut row_groups_to_read: VecDeque<usize>,
43        parquet_meta: Arc<ParquetMetaData>,
44        data: Bytes,
45        sequence: Option<SequenceNumber>,
46    ) -> error::Result<Self> {
47        let projection_mask = ProjectionMask::roots(
48            parquet_meta.file_metadata().schema_descr(),
49            context.read_format().projection_indices().iter().copied(),
50        );
51
52        let builder = MemtableRowGroupReaderBuilder::try_new(
53            context.clone(),
54            projection_mask,
55            parquet_meta,
56            data,
57        )?;
58
59        let init_reader = row_groups_to_read
60            .pop_front()
61            .map(|first_row_group| builder.build_row_group_reader(first_row_group, None))
62            .transpose()?
63            .map(|r| PruneReader::new(context, r));
64        Ok(Self {
65            row_groups_to_read,
66            current_reader: init_reader,
67            builder,
68            sequence,
69        })
70    }
71
72    pub(crate) fn next_batch(&mut self) -> error::Result<Option<Batch>> {
73        let Some(current) = &mut self.current_reader else {
74            // All row group exhausted.
75            return Ok(None);
76        };
77
78        if let Some(mut batch) = current.next_batch()? {
79            batch.filter_by_sequence(self.sequence)?;
80            return Ok(Some(batch));
81        }
82
83        // Previous row group exhausted, read next row group
84        while let Some(next_row_group) = self.row_groups_to_read.pop_front() {
85            current.reset(self.builder.build_row_group_reader(next_row_group, None)?);
86            if let Some(mut next_batch) = current.next_batch()? {
87                next_batch.filter_by_sequence(self.sequence)?;
88                return Ok(Some(next_batch));
89            }
90        }
91        Ok(None)
92    }
93}
94
95impl Iterator for BulkPartIter {
96    type Item = error::Result<Batch>;
97
98    fn next(&mut self) -> Option<Self::Item> {
99        self.next_batch().transpose()
100    }
101}
102
103struct PruneReader {
104    context: BulkIterContextRef,
105    row_group_reader: MemtableRowGroupReader,
106}
107
108//todo(hl): maybe we also need to support lastrow mode here.
109impl PruneReader {
110    fn new(context: BulkIterContextRef, reader: MemtableRowGroupReader) -> Self {
111        Self {
112            context,
113            row_group_reader: reader,
114        }
115    }
116
117    /// Iterates current inner reader until exhausted.
118    fn next_batch(&mut self) -> error::Result<Option<Batch>> {
119        while let Some(b) = self.row_group_reader.next_inner()? {
120            match self.prune(b)? {
121                Some(b) => {
122                    return Ok(Some(b));
123                }
124                None => {
125                    continue;
126                }
127            }
128        }
129        Ok(None)
130    }
131
132    /// Prunes batch according to filters.
133    fn prune(&mut self, batch: Batch) -> error::Result<Option<Batch>> {
134        //todo(hl): add metrics.
135
136        // fast path
137        if self.context.base.filters.is_empty() {
138            return Ok(Some(batch));
139        }
140
141        let Some(batch_filtered) = self.context.base.precise_filter(batch)? else {
142            // the entire batch is filtered out
143            return Ok(None);
144        };
145        if !batch_filtered.is_empty() {
146            Ok(Some(batch_filtered))
147        } else {
148            Ok(None)
149        }
150    }
151
152    fn reset(&mut self, reader: MemtableRowGroupReader) {
153        self.row_group_reader = reader;
154    }
155}