mito2/memtable/bulk/
row_group_reader.rs1use std::ops::Range;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use datatypes::arrow::array::RecordBatch;
20use datatypes::arrow::error::ArrowError;
21use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowGroups, RowSelection};
22use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
23use parquet::column::page::{PageIterator, PageReader};
24use parquet::file::metadata::ParquetMetaData;
25use snafu::ResultExt;
26
27use crate::error;
28use crate::error::ReadDataPartSnafu;
29use crate::memtable::bulk::context::BulkIterContextRef;
30use crate::sst::parquet::format::ReadFormat;
31use crate::sst::parquet::reader::{RowGroupReaderBase, RowGroupReaderContext};
32use crate::sst::parquet::row_group::{ColumnChunkIterator, RowGroupBase};
33use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
34
35pub struct MemtableRowGroupPageFetcher<'a> {
39 base: RowGroupBase<'a>,
41 bytes: Bytes,
42}
43
44impl<'a> MemtableRowGroupPageFetcher<'a> {
45 pub(crate) fn create(
46 row_group_idx: usize,
47 parquet_meta: &'a ParquetMetaData,
48 bytes: Bytes,
49 ) -> Self {
50 Self {
51 base: RowGroupBase::new(parquet_meta, row_group_idx),
53 bytes,
54 }
55 }
56
57 pub(crate) fn fetch(&mut self, projection: &ProjectionMask, selection: Option<&RowSelection>) {
59 if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
60 let (fetch_ranges, page_start_offsets) =
62 self.base
63 .calc_sparse_read_ranges(projection, offset_index, selection);
64 if fetch_ranges.is_empty() {
65 return;
66 }
67 let chunk_data = self.fetch_bytes(&fetch_ranges);
68
69 self.base
70 .assign_sparse_chunk(projection, chunk_data, page_start_offsets);
71 } else {
72 let fetch_ranges = self.base.calc_dense_read_ranges(projection);
73 if fetch_ranges.is_empty() {
74 return;
76 }
77 let chunk_data = self.fetch_bytes(&fetch_ranges);
78 self.base.assign_dense_chunk(projection, chunk_data);
79 }
80 }
81
82 fn fetch_bytes(&self, ranges: &[Range<u64>]) -> Vec<Bytes> {
83 ranges
84 .iter()
85 .map(|range| self.bytes.slice(range.start as usize..range.end as usize))
86 .collect()
87 }
88
89 fn column_page_reader(&self, i: usize) -> parquet::errors::Result<Box<dyn PageReader>> {
91 let reader = self.base.column_reader(i)?;
92 Ok(Box::new(reader))
93 }
94}
95
96impl RowGroups for MemtableRowGroupPageFetcher<'_> {
97 fn num_rows(&self) -> usize {
98 self.base.row_count
99 }
100
101 fn column_chunks(&self, i: usize) -> parquet::errors::Result<Box<dyn PageIterator>> {
102 Ok(Box::new(ColumnChunkIterator {
103 reader: Some(self.column_page_reader(i)),
104 }))
105 }
106}
107
108impl RowGroupReaderContext for BulkIterContextRef {
109 fn map_result(
110 &self,
111 result: Result<Option<RecordBatch>, ArrowError>,
112 ) -> error::Result<Option<RecordBatch>> {
113 result.context(error::DecodeArrowRowGroupSnafu)
114 }
115
116 fn read_format(&self) -> &ReadFormat {
117 self.as_ref().read_format()
118 }
119}
120
121pub(crate) type MemtableRowGroupReader = RowGroupReaderBase<BulkIterContextRef>;
122
123pub(crate) struct MemtableRowGroupReaderBuilder {
124 context: BulkIterContextRef,
125 projection: ProjectionMask,
126 parquet_metadata: Arc<ParquetMetaData>,
127 field_levels: FieldLevels,
128 data: Bytes,
129}
130
131impl MemtableRowGroupReaderBuilder {
132 pub(crate) fn try_new(
133 context: BulkIterContextRef,
134 projection: ProjectionMask,
135 parquet_metadata: Arc<ParquetMetaData>,
136 data: Bytes,
137 ) -> error::Result<Self> {
138 let parquet_schema_desc = parquet_metadata.file_metadata().schema_descr();
139 let hint = Some(context.read_format().arrow_schema().fields());
140 let field_levels =
141 parquet_to_arrow_field_levels(parquet_schema_desc, projection.clone(), hint)
142 .context(ReadDataPartSnafu)?;
143 Ok(Self {
144 context,
145 projection,
146 parquet_metadata,
147 field_levels,
148 data,
149 })
150 }
151
152 pub(crate) fn build_row_group_reader(
154 &self,
155 row_group_idx: usize,
156 row_selection: Option<RowSelection>,
157 ) -> error::Result<MemtableRowGroupReader> {
158 let mut row_group = MemtableRowGroupPageFetcher::create(
159 row_group_idx,
160 &self.parquet_metadata,
161 self.data.clone(),
162 );
163 row_group.fetch(&self.projection, row_selection.as_ref());
165
166 let reader = ParquetRecordBatchReader::try_new_with_row_groups(
169 &self.field_levels,
170 &row_group,
171 DEFAULT_READ_BATCH_SIZE,
172 row_selection,
173 )
174 .context(ReadDataPartSnafu)?;
175 Ok(MemtableRowGroupReader::create(self.context.clone(), reader))
176 }
177}