mito2/memtable/bulk/
row_group_reader.rs1use std::ops::Range;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use datatypes::arrow::array::RecordBatch;
20use datatypes::arrow::error::ArrowError;
21use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowGroups, RowSelection};
22use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
23use parquet::column::page::{PageIterator, PageReader};
24use parquet::file::metadata::ParquetMetaData;
25use snafu::ResultExt;
26
27use crate::error;
28use crate::error::ReadDataPartSnafu;
29use crate::memtable::bulk::context::BulkIterContextRef;
30use crate::sst::parquet::format::ReadFormat;
31use crate::sst::parquet::reader::RowGroupReaderContext;
32use crate::sst::parquet::row_group::{ColumnChunkIterator, RowGroupBase};
33use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
34
35pub struct MemtableRowGroupPageFetcher<'a> {
39 base: RowGroupBase<'a>,
41 bytes: Bytes,
42}
43
44impl<'a> MemtableRowGroupPageFetcher<'a> {
45 pub(crate) fn create(
46 row_group_idx: usize,
47 parquet_meta: &'a ParquetMetaData,
48 bytes: Bytes,
49 ) -> Self {
50 Self {
51 base: RowGroupBase::new(parquet_meta, row_group_idx),
53 bytes,
54 }
55 }
56
57 pub(crate) fn fetch(&mut self, projection: &ProjectionMask, selection: Option<&RowSelection>) {
59 if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
60 let (fetch_ranges, page_start_offsets) =
62 self.base
63 .calc_sparse_read_ranges(projection, offset_index, selection);
64 if fetch_ranges.is_empty() {
65 return;
66 }
67 let chunk_data = self.fetch_bytes(&fetch_ranges);
68
69 self.base
70 .assign_sparse_chunk(projection, chunk_data, page_start_offsets);
71 } else {
72 let fetch_ranges = self.base.calc_dense_read_ranges(projection);
73 if fetch_ranges.is_empty() {
74 return;
76 }
77 let chunk_data = self.fetch_bytes(&fetch_ranges);
78 self.base.assign_dense_chunk(projection, chunk_data);
79 }
80 }
81
82 fn fetch_bytes(&self, ranges: &[Range<u64>]) -> Vec<Bytes> {
83 ranges
84 .iter()
85 .map(|range| self.bytes.slice(range.start as usize..range.end as usize))
86 .collect()
87 }
88
89 fn column_page_reader(&self, i: usize) -> parquet::errors::Result<Box<dyn PageReader>> {
91 let reader = self.base.column_reader(i)?;
92 Ok(Box::new(reader))
93 }
94}
95
96impl RowGroups for MemtableRowGroupPageFetcher<'_> {
97 fn num_rows(&self) -> usize {
98 self.base.row_count
99 }
100
101 fn column_chunks(&self, i: usize) -> parquet::errors::Result<Box<dyn PageIterator>> {
102 Ok(Box::new(ColumnChunkIterator {
103 reader: Some(self.column_page_reader(i)),
104 }))
105 }
106}
107
108impl RowGroupReaderContext for BulkIterContextRef {
109 fn map_result(
110 &self,
111 result: Result<Option<RecordBatch>, ArrowError>,
112 ) -> error::Result<Option<RecordBatch>> {
113 result.context(error::DecodeArrowRowGroupSnafu)
114 }
115
116 fn read_format(&self) -> &ReadFormat {
117 self.as_ref().read_format()
118 }
119}
120
121pub(crate) struct MemtableRowGroupReaderBuilder {
122 projection: ProjectionMask,
123 parquet_metadata: Arc<ParquetMetaData>,
124 field_levels: FieldLevels,
125 data: Bytes,
126}
127
128impl MemtableRowGroupReaderBuilder {
129 pub(crate) fn try_new(
130 context: &BulkIterContextRef,
131 projection: ProjectionMask,
132 parquet_metadata: Arc<ParquetMetaData>,
133 data: Bytes,
134 ) -> error::Result<Self> {
135 let parquet_schema_desc = parquet_metadata.file_metadata().schema_descr();
136 let hint = Some(context.read_format().arrow_schema().fields());
137 let field_levels =
138 parquet_to_arrow_field_levels(parquet_schema_desc, projection.clone(), hint)
139 .context(ReadDataPartSnafu)?;
140 Ok(Self {
141 projection,
142 parquet_metadata,
143 field_levels,
144 data,
145 })
146 }
147
148 pub(crate) fn build_row_group_reader(
150 &self,
151 row_group_idx: usize,
152 row_selection: Option<RowSelection>,
153 ) -> error::Result<ParquetRecordBatchReader> {
154 let mut row_group = MemtableRowGroupPageFetcher::create(
155 row_group_idx,
156 &self.parquet_metadata,
157 self.data.clone(),
158 );
159 row_group.fetch(&self.projection, row_selection.as_ref());
161
162 ParquetRecordBatchReader::try_new_with_row_groups(
165 &self.field_levels,
166 &row_group,
167 DEFAULT_READ_BATCH_SIZE,
168 row_selection,
169 )
170 .context(ReadDataPartSnafu)
171 }
172}