1use std::ops::Range;
18use std::sync::Arc;
19
20use bytes::{Buf, Bytes};
21use object_store::ObjectStore;
22use parquet::arrow::arrow_reader::{RowGroups, RowSelection};
23use parquet::arrow::ProjectionMask;
24use parquet::column::page::{PageIterator, PageReader};
25use parquet::errors::{ParquetError, Result};
26use parquet::file::metadata::{ColumnChunkMetaData, ParquetMetaData, RowGroupMetaData};
27use parquet::file::page_index::offset_index::OffsetIndexMetaData;
28use parquet::file::properties::DEFAULT_PAGE_SIZE;
29use parquet::file::reader::{ChunkReader, Length};
30use parquet::file::serialized_reader::SerializedPageReader;
31use store_api::storage::RegionId;
32use tokio::task::yield_now;
33
34use crate::cache::file_cache::{FileType, IndexKey};
35use crate::cache::{CacheStrategy, PageKey, PageValue};
36use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
37use crate::sst::file::FileId;
38use crate::sst::parquet::helper::fetch_byte_ranges;
39use crate::sst::parquet::page_reader::RowGroupCachedReader;
40
41pub(crate) struct RowGroupBase<'a> {
42 metadata: &'a RowGroupMetaData,
43 pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
44 column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
46 pub(crate) row_count: usize,
47 column_uncompressed_pages: Vec<Option<Arc<PageValue>>>,
52}
53
54impl<'a> RowGroupBase<'a> {
55 pub(crate) fn new(parquet_meta: &'a ParquetMetaData, row_group_idx: usize) -> Self {
56 let metadata = parquet_meta.row_group(row_group_idx);
57 let offset_index = parquet_meta
61 .offset_index()
62 .filter(|index| !index.is_empty())
64 .map(|x| x[row_group_idx].as_slice());
65
66 Self {
67 metadata,
68 offset_index,
69 column_chunks: vec![None; metadata.columns().len()],
70 row_count: metadata.num_rows() as usize,
71 column_uncompressed_pages: vec![None; metadata.columns().len()],
72 }
73 }
74
75 pub(crate) fn calc_sparse_read_ranges(
76 &self,
77 projection: &ProjectionMask,
78 offset_index: &[OffsetIndexMetaData],
79 selection: &RowSelection,
80 ) -> (Vec<Range<u64>>, Vec<Vec<usize>>) {
81 let mut page_start_offsets: Vec<Vec<usize>> = vec![];
84 let ranges = self
85 .column_chunks
86 .iter()
87 .zip(self.metadata.columns())
88 .enumerate()
89 .filter(|&(idx, (chunk, _chunk_meta))| chunk.is_none() && projection.leaf_included(idx))
90 .flat_map(|(idx, (_chunk, chunk_meta))| {
91 let mut ranges = vec![];
94 let (start, _len) = chunk_meta.byte_range();
95 match offset_index[idx].page_locations.first() {
96 Some(first) if first.offset as u64 != start => {
97 ranges.push(start..first.offset as u64);
98 }
99 _ => (),
100 }
101
102 ranges.extend(
103 selection
104 .scan_ranges(&offset_index[idx].page_locations)
105 .iter()
106 .map(|range| range.start as u64..range.end as u64),
107 );
108 page_start_offsets.push(ranges.iter().map(|range| range.start as usize).collect());
109
110 ranges
111 })
112 .collect::<Vec<_>>();
113 (ranges, page_start_offsets)
114 }
115
116 pub(crate) fn assign_sparse_chunk(
117 &mut self,
118 projection: &ProjectionMask,
119 data: Vec<Bytes>,
120 page_start_offsets: Vec<Vec<usize>>,
121 ) {
122 let mut page_start_offsets = page_start_offsets.into_iter();
123 let mut chunk_data = data.into_iter();
124
125 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
126 if chunk.is_some() || !projection.leaf_included(idx) {
127 continue;
128 }
129
130 if let Some(offsets) = page_start_offsets.next() {
131 let mut chunks = Vec::with_capacity(offsets.len());
132 for _ in 0..offsets.len() {
133 chunks.push(chunk_data.next().unwrap());
134 }
135
136 *chunk = Some(Arc::new(ColumnChunkData::Sparse {
137 length: self.metadata.column(idx).byte_range().1 as usize,
138 data: offsets.into_iter().zip(chunks).collect(),
139 }))
140 }
141 }
142 }
143
144 pub(crate) fn calc_dense_read_ranges(&self, projection: &ProjectionMask) -> Vec<Range<u64>> {
145 self.column_chunks
146 .iter()
147 .zip(&self.column_uncompressed_pages)
148 .enumerate()
149 .filter(|&(idx, (chunk, uncompressed_pages))| {
150 chunk.is_none() && projection.leaf_included(idx) && uncompressed_pages.is_none()
152 })
153 .map(|(idx, (_chunk, _pages))| {
154 let column = self.metadata.column(idx);
155 let (start, length) = column.byte_range();
156 start..(start + length)
157 })
158 .collect::<Vec<_>>()
159 }
160
161 pub(crate) fn assign_dense_chunk(
164 &mut self,
165 projection: &ProjectionMask,
166 chunk_data: Vec<Bytes>,
167 ) -> Vec<(usize, Bytes)> {
168 let mut chunk_data = chunk_data.into_iter();
169 let mut res = vec![];
170
171 for (idx, (chunk, row_group_pages)) in self
172 .column_chunks
173 .iter_mut()
174 .zip(&self.column_uncompressed_pages)
175 .enumerate()
176 {
177 if chunk.is_some() || !projection.leaf_included(idx) || row_group_pages.is_some() {
178 continue;
179 }
180
181 let Some(data) = chunk_data.next() else {
183 continue;
184 };
185
186 let column = self.metadata.column(idx);
187 res.push((idx, data.clone()));
188 *chunk = Some(Arc::new(ColumnChunkData::Dense {
189 offset: column.byte_range().0 as usize,
190 data,
191 }));
192 }
193 res
194 }
195
196 pub(crate) fn column_reader(
198 &self,
199 col_idx: usize,
200 ) -> Result<SerializedPageReader<ColumnChunkData>> {
201 let page_reader = match &self.column_chunks[col_idx] {
202 None => {
203 return Err(ParquetError::General(format!(
204 "Invalid column index {col_idx}, column was not fetched"
205 )))
206 }
207 Some(data) => {
208 let page_locations = self
209 .offset_index
210 .filter(|index| !index.is_empty())
212 .map(|index| index[col_idx].page_locations.clone());
213 SerializedPageReader::new(
214 data.clone(),
215 self.metadata.column(col_idx),
216 self.row_count,
217 page_locations,
218 )?
219 }
220 };
221
222 Ok(page_reader)
224 }
225}
226
227pub struct InMemoryRowGroup<'a> {
229 region_id: RegionId,
230 file_id: FileId,
231 row_group_idx: usize,
232 cache_strategy: CacheStrategy,
233 file_path: &'a str,
234 object_store: ObjectStore,
236 base: RowGroupBase<'a>,
237}
238
239impl<'a> InMemoryRowGroup<'a> {
240 pub fn create(
245 region_id: RegionId,
246 file_id: FileId,
247 parquet_meta: &'a ParquetMetaData,
248 row_group_idx: usize,
249 cache_strategy: CacheStrategy,
250 file_path: &'a str,
251 object_store: ObjectStore,
252 ) -> Self {
253 Self {
254 region_id,
255 file_id,
256 row_group_idx,
257 cache_strategy,
258 file_path,
259 object_store,
260 base: RowGroupBase::new(parquet_meta, row_group_idx),
261 }
262 }
263
264 pub async fn fetch(
266 &mut self,
267 projection: &ProjectionMask,
268 selection: Option<&RowSelection>,
269 ) -> Result<()> {
270 if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
271 let (fetch_ranges, page_start_offsets) =
272 self.base
273 .calc_sparse_read_ranges(projection, offset_index, selection);
274
275 let chunk_data = self.fetch_bytes(&fetch_ranges).await?;
276 self.base
278 .assign_sparse_chunk(projection, chunk_data, page_start_offsets);
279 } else {
280 self.fetch_pages_from_cache(projection);
282
283 yield_now().await;
286
287 let fetch_ranges = self.base.calc_dense_read_ranges(projection);
289
290 if fetch_ranges.is_empty() {
291 return Ok(());
293 }
294
295 let chunk_data = self.fetch_bytes(&fetch_ranges).await?;
297
298 let assigned_columns = self.base.assign_dense_chunk(projection, chunk_data);
300
301 for (col_idx, data) in assigned_columns {
303 let column = self.base.metadata.column(col_idx);
304 if !cache_uncompressed_pages(column) {
305 let page_key = PageKey::new_compressed(
308 self.region_id,
309 self.file_id,
310 self.row_group_idx,
311 col_idx,
312 );
313 self.cache_strategy
314 .put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone())));
315 }
316 }
317 }
318
319 Ok(())
320 }
321
322 fn fetch_pages_from_cache(&mut self, projection: &ProjectionMask) {
325 let _timer = READ_STAGE_FETCH_PAGES.start_timer();
326 self.base
327 .column_chunks
328 .iter_mut()
329 .enumerate()
330 .filter(|(idx, chunk)| chunk.is_none() && projection.leaf_included(*idx))
331 .for_each(|(idx, chunk)| {
332 let column = self.base.metadata.column(idx);
333 if cache_uncompressed_pages(column) {
334 let page_key = PageKey::new_uncompressed(
336 self.region_id,
337 self.file_id,
338 self.row_group_idx,
339 idx,
340 );
341 self.base.column_uncompressed_pages[idx] =
342 self.cache_strategy.get_pages(&page_key);
343 } else {
344 let page_key = PageKey::new_compressed(
346 self.region_id,
347 self.file_id,
348 self.row_group_idx,
349 idx,
350 );
351
352 *chunk = self.cache_strategy.get_pages(&page_key).map(|page_value| {
353 Arc::new(ColumnChunkData::Dense {
354 offset: column.byte_range().0 as usize,
355 data: page_value.compressed.clone(),
356 })
357 });
358 }
359 });
360 }
361
362 async fn fetch_bytes(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
365 let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet);
366 match self.fetch_ranges_from_write_cache(key, ranges).await {
367 Some(data) => Ok(data),
368 None => {
369 let _timer = READ_STAGE_ELAPSED
371 .with_label_values(&["cache_miss_read"])
372 .start_timer();
373 let data = fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges)
374 .await
375 .map_err(|e| ParquetError::External(Box::new(e)))?;
376 Ok(data)
377 }
378 }
379 }
380
381 async fn fetch_ranges_from_write_cache(
384 &self,
385 key: IndexKey,
386 ranges: &[Range<u64>],
387 ) -> Option<Vec<Bytes>> {
388 if let Some(cache) = self.cache_strategy.write_cache() {
389 return cache.file_cache().read_ranges(key, ranges).await;
390 }
391 None
392 }
393
394 fn column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
396 if let Some(cached_pages) = &self.base.column_uncompressed_pages[i] {
397 debug_assert!(!cached_pages.row_group.is_empty());
398 return Ok(Box::new(RowGroupCachedReader::new(&cached_pages.row_group)));
400 }
401
402 let page_reader = self.base.column_reader(i)?;
403
404 let column = self.base.metadata.column(i);
405 if cache_uncompressed_pages(column) {
406 let pages = page_reader.collect::<Result<Vec<_>>>()?;
409 let page_value = Arc::new(PageValue::new_row_group(pages));
410 let page_key =
411 PageKey::new_uncompressed(self.region_id, self.file_id, self.row_group_idx, i);
412 self.cache_strategy.put_pages(page_key, page_value.clone());
413
414 return Ok(Box::new(RowGroupCachedReader::new(&page_value.row_group)));
415 }
416
417 Ok(Box::new(page_reader))
419 }
420}
421
422fn cache_uncompressed_pages(column: &ColumnChunkMetaData) -> bool {
424 column.uncompressed_size() as usize <= DEFAULT_PAGE_SIZE
427}
428
429impl RowGroups for InMemoryRowGroup<'_> {
430 fn num_rows(&self) -> usize {
431 self.base.row_count
432 }
433
434 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
435 let page_reader = self.column_page_reader(i)?;
436
437 Ok(Box::new(ColumnChunkIterator {
438 reader: Some(Ok(page_reader)),
439 }))
440 }
441}
442
443#[derive(Clone)]
445pub(crate) enum ColumnChunkData {
446 Sparse {
448 length: usize,
450 data: Vec<(usize, Bytes)>,
453 },
454 Dense { offset: usize, data: Bytes },
456}
457
458impl ColumnChunkData {
459 fn get(&self, start: u64) -> Result<Bytes> {
460 match &self {
461 ColumnChunkData::Sparse { data, .. } => data
462 .binary_search_by_key(&start, |(offset, _)| *offset as u64)
463 .map(|idx| data[idx].1.clone())
464 .map_err(|_| {
465 ParquetError::General(format!(
466 "Invalid offset in sparse column chunk data: {start}"
467 ))
468 }),
469 ColumnChunkData::Dense { offset, data } => {
470 let start = start as usize - *offset;
471 Ok(data.slice(start..))
472 }
473 }
474 }
475}
476
477impl Length for ColumnChunkData {
478 fn len(&self) -> u64 {
479 match &self {
480 ColumnChunkData::Sparse { length, .. } => *length as u64,
481 ColumnChunkData::Dense { data, .. } => data.len() as u64,
482 }
483 }
484}
485
486impl ChunkReader for ColumnChunkData {
487 type T = bytes::buf::Reader<Bytes>;
488
489 fn get_read(&self, start: u64) -> Result<Self::T> {
490 Ok(self.get(start)?.reader())
491 }
492
493 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
494 Ok(self.get(start)?.slice(..length))
495 }
496}
497
498pub(crate) struct ColumnChunkIterator {
500 pub(crate) reader: Option<Result<Box<dyn PageReader>>>,
501}
502
503impl Iterator for ColumnChunkIterator {
504 type Item = Result<Box<dyn PageReader>>;
505
506 fn next(&mut self) -> Option<Self::Item> {
507 self.reader.take()
508 }
509}
510
511impl PageIterator for ColumnChunkIterator {}