mito2/memtable/bulk/
row_group_reader.rs1use std::ops::Range;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use datatypes::arrow::array::RecordBatch;
20use datatypes::arrow::error::ArrowError;
21use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowGroups, RowSelection};
22use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
23use parquet::column::page::{PageIterator, PageReader};
24use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
25use snafu::ResultExt;
26
27use crate::error;
28use crate::error::ReadDataPartSnafu;
29use crate::memtable::bulk::context::BulkIterContextRef;
30use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
31use crate::sst::parquet::format::ReadFormat;
32use crate::sst::parquet::reader::RowGroupReaderContext;
33use crate::sst::parquet::row_group::{ColumnChunkIterator, RowGroupBase};
34
35pub struct MemtableRowGroupPageFetcher<'a> {
39 base: RowGroupBase<'a>,
41 bytes: Bytes,
42}
43
44impl<'a> MemtableRowGroupPageFetcher<'a> {
45 pub(crate) fn create(
46 row_group_idx: usize,
47 parquet_meta: &'a ParquetMetaData,
48 bytes: Bytes,
49 ) -> Self {
50 Self {
51 base: RowGroupBase::new(parquet_meta, row_group_idx),
53 bytes,
54 }
55 }
56
57 pub(crate) fn fetch(&mut self, projection: &ProjectionMask, selection: Option<&RowSelection>) {
59 if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
60 let (fetch_ranges, page_start_offsets) =
62 self.base
63 .calc_sparse_read_ranges(projection, offset_index, selection);
64 if fetch_ranges.is_empty() {
65 return;
66 }
67 let chunk_data = self.fetch_bytes(&fetch_ranges);
68
69 self.base
70 .assign_sparse_chunk(projection, chunk_data, page_start_offsets);
71 } else {
72 let fetch_ranges = self.base.calc_dense_read_ranges(projection);
73 if fetch_ranges.is_empty() {
74 return;
76 }
77 let chunk_data = self.fetch_bytes(&fetch_ranges);
78 self.base.assign_dense_chunk(projection, chunk_data);
79 }
80 }
81
82 fn fetch_bytes(&self, ranges: &[Range<u64>]) -> Vec<Bytes> {
83 ranges
84 .iter()
85 .map(|range| self.bytes.slice(range.start as usize..range.end as usize))
86 .collect()
87 }
88
89 fn column_page_reader(&self, i: usize) -> parquet::errors::Result<Box<dyn PageReader>> {
91 let reader = self.base.column_reader(i)?;
92 Ok(Box::new(reader))
93 }
94}
95
96impl RowGroups for MemtableRowGroupPageFetcher<'_> {
97 fn num_rows(&self) -> usize {
98 self.base.row_count
99 }
100
101 fn column_chunks(&self, i: usize) -> parquet::errors::Result<Box<dyn PageIterator>> {
102 Ok(Box::new(ColumnChunkIterator {
103 reader: Some(self.column_page_reader(i)),
104 }))
105 }
106
107 fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
108 Box::new(std::iter::once(self.base.row_group_metadata()))
109 }
110
111 fn metadata(&self) -> &ParquetMetaData {
112 self.base.parquet_metadata()
113 }
114}
115
116impl RowGroupReaderContext for BulkIterContextRef {
117 fn map_result(
118 &self,
119 result: Result<Option<RecordBatch>, ArrowError>,
120 ) -> error::Result<Option<RecordBatch>> {
121 result.context(error::DecodeArrowRowGroupSnafu)
122 }
123
124 fn read_format(&self) -> &ReadFormat {
125 self.as_ref().read_format()
126 }
127}
128
129pub(crate) struct MemtableRowGroupReaderBuilder {
130 projection: ProjectionMask,
131 parquet_metadata: Arc<ParquetMetaData>,
132 field_levels: FieldLevels,
133 data: Bytes,
134}
135
136impl MemtableRowGroupReaderBuilder {
137 pub(crate) fn try_new(
138 context: &BulkIterContextRef,
139 projection: ProjectionMask,
140 parquet_metadata: Arc<ParquetMetaData>,
141 data: Bytes,
142 ) -> error::Result<Self> {
143 let parquet_schema_desc = parquet_metadata.file_metadata().schema_descr();
144 let hint = Some(context.read_format().arrow_schema().fields());
145 let field_levels =
146 parquet_to_arrow_field_levels(parquet_schema_desc, projection.clone(), hint)
147 .context(ReadDataPartSnafu)?;
148 Ok(Self {
149 projection,
150 parquet_metadata,
151 field_levels,
152 data,
153 })
154 }
155
156 pub(crate) fn build_row_group_reader(
158 &self,
159 row_group_idx: usize,
160 row_selection: Option<RowSelection>,
161 ) -> error::Result<ParquetRecordBatchReader> {
162 let mut row_group = MemtableRowGroupPageFetcher::create(
163 row_group_idx,
164 &self.parquet_metadata,
165 self.data.clone(),
166 );
167 row_group.fetch(&self.projection, row_selection.as_ref());
169
170 ParquetRecordBatchReader::try_new_with_row_groups(
173 &self.field_levels,
174 &row_group,
175 DEFAULT_READ_BATCH_SIZE,
176 row_selection,
177 )
178 .context(ReadDataPartSnafu)
179 }
180
181 pub(crate) fn compute_skip_fields(
183 &self,
184 context: &BulkIterContextRef,
185 row_group_idx: usize,
186 ) -> bool {
187 use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
188
189 match context.pre_filter_mode() {
190 PreFilterMode::All => false,
191 PreFilterMode::SkipFields => true,
192 PreFilterMode::SkipFieldsOnDelete => {
193 row_group_contains_delete(&self.parquet_metadata, row_group_idx, "memtable")
195 .unwrap_or(true)
196 }
197 }
198 }
199}