1use std::ops::Range;
18use std::sync::Arc;
19
20use bytes::{Buf, Bytes};
21use object_store::ObjectStore;
22use parquet::arrow::ProjectionMask;
23use parquet::arrow::arrow_reader::{RowGroups, RowSelection};
24use parquet::column::page::{PageIterator, PageReader};
25use parquet::errors::{ParquetError, Result};
26use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
27use parquet::file::page_index::offset_index::OffsetIndexMetaData;
28use parquet::file::reader::{ChunkReader, Length};
29use parquet::file::serialized_reader::SerializedPageReader;
30use store_api::storage::{FileId, RegionId};
31use tokio::task::yield_now;
32
33use crate::cache::file_cache::{FileType, IndexKey};
34use crate::cache::{CacheStrategy, PageKey, PageValue};
35use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
36use crate::sst::parquet::helper::{MERGE_GAP, fetch_byte_ranges};
37
38pub(crate) struct RowGroupBase<'a> {
39 metadata: &'a RowGroupMetaData,
40 pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
41 column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
43 pub(crate) row_count: usize,
44}
45
46impl<'a> RowGroupBase<'a> {
47 pub(crate) fn new(parquet_meta: &'a ParquetMetaData, row_group_idx: usize) -> Self {
48 let metadata = parquet_meta.row_group(row_group_idx);
49 let offset_index = parquet_meta
53 .offset_index()
54 .filter(|index| !index.is_empty())
56 .map(|x| x[row_group_idx].as_slice());
57
58 Self {
59 metadata,
60 offset_index,
61 column_chunks: vec![None; metadata.columns().len()],
62 row_count: metadata.num_rows() as usize,
63 }
64 }
65
66 pub(crate) fn calc_sparse_read_ranges(
67 &self,
68 projection: &ProjectionMask,
69 offset_index: &[OffsetIndexMetaData],
70 selection: &RowSelection,
71 ) -> (Vec<Range<u64>>, Vec<Vec<usize>>) {
72 let mut page_start_offsets: Vec<Vec<usize>> = vec![];
75 let ranges = self
76 .column_chunks
77 .iter()
78 .zip(self.metadata.columns())
79 .enumerate()
80 .filter(|&(idx, (chunk, _chunk_meta))| chunk.is_none() && projection.leaf_included(idx))
81 .flat_map(|(idx, (_chunk, chunk_meta))| {
82 let mut ranges = vec![];
85 let (start, _len) = chunk_meta.byte_range();
86 match offset_index[idx].page_locations.first() {
87 Some(first) if first.offset as u64 != start => {
88 ranges.push(start..first.offset as u64);
89 }
90 _ => (),
91 }
92
93 ranges.extend(
94 selection
95 .scan_ranges(&offset_index[idx].page_locations)
96 .iter()
97 .map(|range| range.start..range.end),
98 );
99 page_start_offsets.push(ranges.iter().map(|range| range.start as usize).collect());
100
101 ranges
102 })
103 .collect::<Vec<_>>();
104 (ranges, page_start_offsets)
105 }
106
107 pub(crate) fn assign_sparse_chunk(
108 &mut self,
109 projection: &ProjectionMask,
110 data: Vec<Bytes>,
111 page_start_offsets: Vec<Vec<usize>>,
112 ) {
113 let mut page_start_offsets = page_start_offsets.into_iter();
114 let mut chunk_data = data.into_iter();
115
116 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
117 if chunk.is_some() || !projection.leaf_included(idx) {
118 continue;
119 }
120
121 if let Some(offsets) = page_start_offsets.next() {
122 let mut chunks = Vec::with_capacity(offsets.len());
123 for _ in 0..offsets.len() {
124 chunks.push(chunk_data.next().unwrap());
125 }
126
127 *chunk = Some(Arc::new(ColumnChunkData::Sparse {
128 length: self.metadata.column(idx).byte_range().1 as usize,
129 data: offsets.into_iter().zip(chunks).collect(),
130 }))
131 }
132 }
133 }
134
135 pub(crate) fn calc_dense_read_ranges(&self, projection: &ProjectionMask) -> Vec<Range<u64>> {
136 self.column_chunks
137 .iter()
138 .enumerate()
139 .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
140 .map(|(idx, _chunk)| {
141 let column = self.metadata.column(idx);
142 let (start, length) = column.byte_range();
143 start..(start + length)
144 })
145 .collect::<Vec<_>>()
146 }
147
148 pub(crate) fn assign_dense_chunk(
151 &mut self,
152 projection: &ProjectionMask,
153 chunk_data: Vec<Bytes>,
154 ) {
155 let mut chunk_data = chunk_data.into_iter();
156
157 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
158 if chunk.is_some() || !projection.leaf_included(idx) {
159 continue;
160 }
161
162 let Some(data) = chunk_data.next() else {
164 continue;
165 };
166
167 let column = self.metadata.column(idx);
168 *chunk = Some(Arc::new(ColumnChunkData::Dense {
169 offset: column.byte_range().0 as usize,
170 data,
171 }));
172 }
173 }
174
175 pub(crate) fn column_reader(
177 &self,
178 col_idx: usize,
179 ) -> Result<SerializedPageReader<ColumnChunkData>> {
180 let page_reader = match &self.column_chunks[col_idx] {
181 None => {
182 return Err(ParquetError::General(format!(
183 "Invalid column index {col_idx}, column was not fetched"
184 )));
185 }
186 Some(data) => {
187 let page_locations = self
188 .offset_index
189 .filter(|index| !index.is_empty())
191 .map(|index| index[col_idx].page_locations.clone());
192 SerializedPageReader::new(
193 data.clone(),
194 self.metadata.column(col_idx),
195 self.row_count,
196 page_locations,
197 )?
198 }
199 };
200
201 Ok(page_reader)
202 }
203}
204
205pub struct InMemoryRowGroup<'a> {
207 region_id: RegionId,
208 file_id: FileId,
209 row_group_idx: usize,
210 cache_strategy: CacheStrategy,
211 file_path: &'a str,
212 object_store: ObjectStore,
214 base: RowGroupBase<'a>,
215}
216
217impl<'a> InMemoryRowGroup<'a> {
218 pub fn create(
223 region_id: RegionId,
224 file_id: FileId,
225 parquet_meta: &'a ParquetMetaData,
226 row_group_idx: usize,
227 cache_strategy: CacheStrategy,
228 file_path: &'a str,
229 object_store: ObjectStore,
230 ) -> Self {
231 Self {
232 region_id,
233 file_id,
234 row_group_idx,
235 cache_strategy,
236 file_path,
237 object_store,
238 base: RowGroupBase::new(parquet_meta, row_group_idx),
239 }
240 }
241
242 pub async fn fetch(
244 &mut self,
245 projection: &ProjectionMask,
246 selection: Option<&RowSelection>,
247 ) -> Result<()> {
248 if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
249 let (fetch_ranges, page_start_offsets) =
250 self.base
251 .calc_sparse_read_ranges(projection, offset_index, selection);
252
253 let chunk_data = self.fetch_bytes(&fetch_ranges).await?;
254 self.base
256 .assign_sparse_chunk(projection, chunk_data, page_start_offsets);
257 } else {
258 yield_now().await;
261
262 let fetch_ranges = self.base.calc_dense_read_ranges(projection);
264
265 if fetch_ranges.is_empty() {
266 return Ok(());
268 }
269
270 let chunk_data = self.fetch_bytes(&fetch_ranges).await?;
272
273 self.base.assign_dense_chunk(projection, chunk_data);
275 }
276
277 Ok(())
278 }
279
280 async fn fetch_bytes(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
283 let _timer = READ_STAGE_FETCH_PAGES.start_timer();
285 let page_key = PageKey::new(self.file_id, self.row_group_idx, ranges.to_vec());
286 if let Some(pages) = self.cache_strategy.get_pages(&page_key) {
287 return Ok(pages.compressed.clone());
288 }
289
290 let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet);
291 let pages = match self.fetch_ranges_from_write_cache(key, ranges).await {
292 Some(data) => data,
293 None => {
294 let _timer = READ_STAGE_ELAPSED
296 .with_label_values(&["cache_miss_read"])
297 .start_timer();
298
299 fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges)
300 .await
301 .map_err(|e| ParquetError::External(Box::new(e)))?
302 }
303 };
304
305 let total_range_size = compute_total_range_size(ranges);
307 let page_value = PageValue::new(pages.clone(), total_range_size);
308 self.cache_strategy
309 .put_pages(page_key, Arc::new(page_value));
310
311 Ok(pages)
312 }
313
314 async fn fetch_ranges_from_write_cache(
317 &self,
318 key: IndexKey,
319 ranges: &[Range<u64>],
320 ) -> Option<Vec<Bytes>> {
321 if let Some(cache) = self.cache_strategy.write_cache() {
322 return cache.file_cache().read_ranges(key, ranges).await;
323 }
324 None
325 }
326}
327
328fn compute_total_range_size(ranges: &[Range<u64>]) -> u64 {
331 if ranges.is_empty() {
332 return 0;
333 }
334
335 let gap = MERGE_GAP as u64;
336 let mut sorted_ranges = ranges.to_vec();
337 sorted_ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start));
338
339 let mut total_size = 0;
340 let mut cur = sorted_ranges[0].clone();
341
342 for range in sorted_ranges.into_iter().skip(1) {
343 if range.start <= cur.end + gap {
344 cur.end = cur.end.max(range.end);
346 } else {
347 total_size += align_to_pooled_buf_size(cur.end - cur.start);
349 cur = range;
350 }
351 }
352
353 total_size += align_to_pooled_buf_size(cur.end - cur.start);
355
356 total_size
357}
358
359fn align_to_pooled_buf_size(size: u64) -> u64 {
364 const POOLED_BUF_SIZE: u64 = 2 * 1024 * 1024;
365 size.div_ceil(POOLED_BUF_SIZE) * POOLED_BUF_SIZE
366}
367
368impl RowGroups for InMemoryRowGroup<'_> {
369 fn num_rows(&self) -> usize {
370 self.base.row_count
371 }
372
373 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
374 let page_reader = self.base.column_reader(i)?;
376
377 Ok(Box::new(ColumnChunkIterator {
378 reader: Some(Ok(Box::new(page_reader))),
379 }))
380 }
381}
382
383#[derive(Clone)]
385pub(crate) enum ColumnChunkData {
386 Sparse {
388 length: usize,
390 data: Vec<(usize, Bytes)>,
393 },
394 Dense { offset: usize, data: Bytes },
396}
397
398impl ColumnChunkData {
399 fn get(&self, start: u64) -> Result<Bytes> {
400 match &self {
401 ColumnChunkData::Sparse { data, .. } => data
402 .binary_search_by_key(&start, |(offset, _)| *offset as u64)
403 .map(|idx| data[idx].1.clone())
404 .map_err(|_| {
405 ParquetError::General(format!(
406 "Invalid offset in sparse column chunk data: {start}"
407 ))
408 }),
409 ColumnChunkData::Dense { offset, data } => {
410 let start = start as usize - *offset;
411 Ok(data.slice(start..))
412 }
413 }
414 }
415}
416
417impl Length for ColumnChunkData {
418 fn len(&self) -> u64 {
419 match &self {
420 ColumnChunkData::Sparse { length, .. } => *length as u64,
421 ColumnChunkData::Dense { data, .. } => data.len() as u64,
422 }
423 }
424}
425
426impl ChunkReader for ColumnChunkData {
427 type T = bytes::buf::Reader<Bytes>;
428
429 fn get_read(&self, start: u64) -> Result<Self::T> {
430 Ok(self.get(start)?.reader())
431 }
432
433 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
434 Ok(self.get(start)?.slice(..length))
435 }
436}
437
438pub(crate) struct ColumnChunkIterator {
440 pub(crate) reader: Option<Result<Box<dyn PageReader>>>,
441}
442
443impl Iterator for ColumnChunkIterator {
444 type Item = Result<Box<dyn PageReader>>;
445
446 fn next(&mut self) -> Option<Self::Item> {
447 self.reader.take()
448 }
449}
450
451impl PageIterator for ColumnChunkIterator {}