1use std::ops::Range;
18use std::sync::Arc;
19
20use bytes::{Buf, Bytes};
21use object_store::ObjectStore;
22use parquet::arrow::arrow_reader::{RowGroups, RowSelection};
23use parquet::arrow::ProjectionMask;
24use parquet::column::page::{PageIterator, PageReader};
25use parquet::errors::{ParquetError, Result};
26use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
27use parquet::file::page_index::offset_index::OffsetIndexMetaData;
28use parquet::file::reader::{ChunkReader, Length};
29use parquet::file::serialized_reader::SerializedPageReader;
30use store_api::storage::RegionId;
31use tokio::task::yield_now;
32
33use crate::cache::file_cache::{FileType, IndexKey};
34use crate::cache::{CacheStrategy, PageKey, PageValue};
35use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
36use crate::sst::file::FileId;
37use crate::sst::parquet::helper::{fetch_byte_ranges, MERGE_GAP};
38
39pub(crate) struct RowGroupBase<'a> {
40 metadata: &'a RowGroupMetaData,
41 pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
42 column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
44 pub(crate) row_count: usize,
45}
46
47impl<'a> RowGroupBase<'a> {
48 pub(crate) fn new(parquet_meta: &'a ParquetMetaData, row_group_idx: usize) -> Self {
49 let metadata = parquet_meta.row_group(row_group_idx);
50 let offset_index = parquet_meta
54 .offset_index()
55 .filter(|index| !index.is_empty())
57 .map(|x| x[row_group_idx].as_slice());
58
59 Self {
60 metadata,
61 offset_index,
62 column_chunks: vec![None; metadata.columns().len()],
63 row_count: metadata.num_rows() as usize,
64 }
65 }
66
67 pub(crate) fn calc_sparse_read_ranges(
68 &self,
69 projection: &ProjectionMask,
70 offset_index: &[OffsetIndexMetaData],
71 selection: &RowSelection,
72 ) -> (Vec<Range<u64>>, Vec<Vec<usize>>) {
73 let mut page_start_offsets: Vec<Vec<usize>> = vec![];
76 let ranges = self
77 .column_chunks
78 .iter()
79 .zip(self.metadata.columns())
80 .enumerate()
81 .filter(|&(idx, (chunk, _chunk_meta))| chunk.is_none() && projection.leaf_included(idx))
82 .flat_map(|(idx, (_chunk, chunk_meta))| {
83 let mut ranges = vec![];
86 let (start, _len) = chunk_meta.byte_range();
87 match offset_index[idx].page_locations.first() {
88 Some(first) if first.offset as u64 != start => {
89 ranges.push(start..first.offset as u64);
90 }
91 _ => (),
92 }
93
94 ranges.extend(
95 selection
96 .scan_ranges(&offset_index[idx].page_locations)
97 .iter()
98 .map(|range| range.start..range.end),
99 );
100 page_start_offsets.push(ranges.iter().map(|range| range.start as usize).collect());
101
102 ranges
103 })
104 .collect::<Vec<_>>();
105 (ranges, page_start_offsets)
106 }
107
108 pub(crate) fn assign_sparse_chunk(
109 &mut self,
110 projection: &ProjectionMask,
111 data: Vec<Bytes>,
112 page_start_offsets: Vec<Vec<usize>>,
113 ) {
114 let mut page_start_offsets = page_start_offsets.into_iter();
115 let mut chunk_data = data.into_iter();
116
117 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
118 if chunk.is_some() || !projection.leaf_included(idx) {
119 continue;
120 }
121
122 if let Some(offsets) = page_start_offsets.next() {
123 let mut chunks = Vec::with_capacity(offsets.len());
124 for _ in 0..offsets.len() {
125 chunks.push(chunk_data.next().unwrap());
126 }
127
128 *chunk = Some(Arc::new(ColumnChunkData::Sparse {
129 length: self.metadata.column(idx).byte_range().1 as usize,
130 data: offsets.into_iter().zip(chunks).collect(),
131 }))
132 }
133 }
134 }
135
136 pub(crate) fn calc_dense_read_ranges(&self, projection: &ProjectionMask) -> Vec<Range<u64>> {
137 self.column_chunks
138 .iter()
139 .enumerate()
140 .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
141 .map(|(idx, _chunk)| {
142 let column = self.metadata.column(idx);
143 let (start, length) = column.byte_range();
144 start..(start + length)
145 })
146 .collect::<Vec<_>>()
147 }
148
149 pub(crate) fn assign_dense_chunk(
152 &mut self,
153 projection: &ProjectionMask,
154 chunk_data: Vec<Bytes>,
155 ) {
156 let mut chunk_data = chunk_data.into_iter();
157
158 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
159 if chunk.is_some() || !projection.leaf_included(idx) {
160 continue;
161 }
162
163 let Some(data) = chunk_data.next() else {
165 continue;
166 };
167
168 let column = self.metadata.column(idx);
169 *chunk = Some(Arc::new(ColumnChunkData::Dense {
170 offset: column.byte_range().0 as usize,
171 data,
172 }));
173 }
174 }
175
176 pub(crate) fn column_reader(
178 &self,
179 col_idx: usize,
180 ) -> Result<SerializedPageReader<ColumnChunkData>> {
181 let page_reader = match &self.column_chunks[col_idx] {
182 None => {
183 return Err(ParquetError::General(format!(
184 "Invalid column index {col_idx}, column was not fetched"
185 )))
186 }
187 Some(data) => {
188 let page_locations = self
189 .offset_index
190 .filter(|index| !index.is_empty())
192 .map(|index| index[col_idx].page_locations.clone());
193 SerializedPageReader::new(
194 data.clone(),
195 self.metadata.column(col_idx),
196 self.row_count,
197 page_locations,
198 )?
199 }
200 };
201
202 Ok(page_reader)
203 }
204}
205
206pub struct InMemoryRowGroup<'a> {
208 region_id: RegionId,
209 file_id: FileId,
210 row_group_idx: usize,
211 cache_strategy: CacheStrategy,
212 file_path: &'a str,
213 object_store: ObjectStore,
215 base: RowGroupBase<'a>,
216}
217
218impl<'a> InMemoryRowGroup<'a> {
219 pub fn create(
224 region_id: RegionId,
225 file_id: FileId,
226 parquet_meta: &'a ParquetMetaData,
227 row_group_idx: usize,
228 cache_strategy: CacheStrategy,
229 file_path: &'a str,
230 object_store: ObjectStore,
231 ) -> Self {
232 Self {
233 region_id,
234 file_id,
235 row_group_idx,
236 cache_strategy,
237 file_path,
238 object_store,
239 base: RowGroupBase::new(parquet_meta, row_group_idx),
240 }
241 }
242
243 pub async fn fetch(
245 &mut self,
246 projection: &ProjectionMask,
247 selection: Option<&RowSelection>,
248 ) -> Result<()> {
249 if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
250 let (fetch_ranges, page_start_offsets) =
251 self.base
252 .calc_sparse_read_ranges(projection, offset_index, selection);
253
254 let chunk_data = self.fetch_bytes(&fetch_ranges).await?;
255 self.base
257 .assign_sparse_chunk(projection, chunk_data, page_start_offsets);
258 } else {
259 yield_now().await;
262
263 let fetch_ranges = self.base.calc_dense_read_ranges(projection);
265
266 if fetch_ranges.is_empty() {
267 return Ok(());
269 }
270
271 let chunk_data = self.fetch_bytes(&fetch_ranges).await?;
273
274 self.base.assign_dense_chunk(projection, chunk_data);
276 }
277
278 Ok(())
279 }
280
281 async fn fetch_bytes(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
284 let _timer = READ_STAGE_FETCH_PAGES.start_timer();
286 let page_key = PageKey::new(self.file_id, self.row_group_idx, ranges.to_vec());
287 if let Some(pages) = self.cache_strategy.get_pages(&page_key) {
288 return Ok(pages.compressed.clone());
289 }
290
291 let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet);
292 let pages = match self.fetch_ranges_from_write_cache(key, ranges).await {
293 Some(data) => data,
294 None => {
295 let _timer = READ_STAGE_ELAPSED
297 .with_label_values(&["cache_miss_read"])
298 .start_timer();
299 let data = fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges)
300 .await
301 .map_err(|e| ParquetError::External(Box::new(e)))?;
302 data
303 }
304 };
305
306 let total_range_size = compute_total_range_size(ranges);
308 let page_value = PageValue::new(pages.clone(), total_range_size);
309 self.cache_strategy
310 .put_pages(page_key, Arc::new(page_value));
311
312 Ok(pages)
313 }
314
315 async fn fetch_ranges_from_write_cache(
318 &self,
319 key: IndexKey,
320 ranges: &[Range<u64>],
321 ) -> Option<Vec<Bytes>> {
322 if let Some(cache) = self.cache_strategy.write_cache() {
323 return cache.file_cache().read_ranges(key, ranges).await;
324 }
325 None
326 }
327}
328
329fn compute_total_range_size(ranges: &[Range<u64>]) -> u64 {
332 if ranges.is_empty() {
333 return 0;
334 }
335
336 let gap = MERGE_GAP as u64;
337 let mut sorted_ranges = ranges.to_vec();
338 sorted_ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start));
339
340 let mut total_size = 0;
341 let mut cur = sorted_ranges[0].clone();
342
343 for range in sorted_ranges.into_iter().skip(1) {
344 if range.start <= cur.end + gap {
345 cur.end = cur.end.max(range.end);
347 } else {
348 total_size += align_to_pooled_buf_size(cur.end - cur.start);
350 cur = range;
351 }
352 }
353
354 total_size += align_to_pooled_buf_size(cur.end - cur.start);
356
357 total_size
358}
359
360fn align_to_pooled_buf_size(size: u64) -> u64 {
365 const POOLED_BUF_SIZE: u64 = 2 * 1024 * 1024;
366 size.div_ceil(POOLED_BUF_SIZE) * POOLED_BUF_SIZE
367}
368
369impl RowGroups for InMemoryRowGroup<'_> {
370 fn num_rows(&self) -> usize {
371 self.base.row_count
372 }
373
374 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
375 let page_reader = self.base.column_reader(i)?;
377
378 Ok(Box::new(ColumnChunkIterator {
379 reader: Some(Ok(Box::new(page_reader))),
380 }))
381 }
382}
383
384#[derive(Clone)]
386pub(crate) enum ColumnChunkData {
387 Sparse {
389 length: usize,
391 data: Vec<(usize, Bytes)>,
394 },
395 Dense { offset: usize, data: Bytes },
397}
398
399impl ColumnChunkData {
400 fn get(&self, start: u64) -> Result<Bytes> {
401 match &self {
402 ColumnChunkData::Sparse { data, .. } => data
403 .binary_search_by_key(&start, |(offset, _)| *offset as u64)
404 .map(|idx| data[idx].1.clone())
405 .map_err(|_| {
406 ParquetError::General(format!(
407 "Invalid offset in sparse column chunk data: {start}"
408 ))
409 }),
410 ColumnChunkData::Dense { offset, data } => {
411 let start = start as usize - *offset;
412 Ok(data.slice(start..))
413 }
414 }
415 }
416}
417
418impl Length for ColumnChunkData {
419 fn len(&self) -> u64 {
420 match &self {
421 ColumnChunkData::Sparse { length, .. } => *length as u64,
422 ColumnChunkData::Dense { data, .. } => data.len() as u64,
423 }
424 }
425}
426
427impl ChunkReader for ColumnChunkData {
428 type T = bytes::buf::Reader<Bytes>;
429
430 fn get_read(&self, start: u64) -> Result<Self::T> {
431 Ok(self.get(start)?.reader())
432 }
433
434 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
435 Ok(self.get(start)?.slice(..length))
436 }
437}
438
439pub(crate) struct ColumnChunkIterator {
441 pub(crate) reader: Option<Result<Box<dyn PageReader>>>,
442}
443
444impl Iterator for ColumnChunkIterator {
445 type Item = Result<Box<dyn PageReader>>;
446
447 fn next(&mut self) -> Option<Self::Item> {
448 self.reader.take()
449 }
450}
451
452impl PageIterator for ColumnChunkIterator {}