1use std::ops::Range;
18use std::sync::Arc;
19
20use bytes::{Buf, Bytes};
21use object_store::ObjectStore;
22use parquet::arrow::ProjectionMask;
23use parquet::arrow::arrow_reader::{RowGroups, RowSelection};
24use parquet::column::page::{PageIterator, PageReader};
25use parquet::errors::{ParquetError, Result};
26use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
27use parquet::file::page_index::offset_index::OffsetIndexMetaData;
28use parquet::file::reader::{ChunkReader, Length};
29use parquet::file::serialized_reader::SerializedPageReader;
30use store_api::storage::{FileId, RegionId};
31use tokio::task::yield_now;
32
33use crate::cache::file_cache::{FileType, IndexKey};
34use crate::cache::{CacheStrategy, PageKey, PageValue};
35use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
36use crate::sst::parquet::helper::{MERGE_GAP, fetch_byte_ranges};
37
38#[derive(Default, Debug, Clone)]
40pub struct ParquetFetchMetricsData {
41 pub page_cache_hit: usize,
43 pub write_cache_hit: usize,
45 pub cache_miss: usize,
47 pub pages_to_fetch_mem: usize,
49 pub page_size_to_fetch_mem: u64,
51 pub pages_to_fetch_write_cache: usize,
53 pub page_size_to_fetch_write_cache: u64,
55 pub pages_to_fetch_store: usize,
57 pub page_size_to_fetch_store: u64,
59 pub page_size_needed: u64,
61 pub write_cache_fetch_elapsed: std::time::Duration,
63 pub store_fetch_elapsed: std::time::Duration,
65 pub total_fetch_elapsed: std::time::Duration,
67}
68
69impl ParquetFetchMetricsData {
70 fn is_empty(&self) -> bool {
72 self.total_fetch_elapsed.is_zero()
73 }
74}
75
76#[derive(Default)]
78pub struct ParquetFetchMetrics {
79 pub data: std::sync::Mutex<ParquetFetchMetricsData>,
80}
81
82impl std::fmt::Debug for ParquetFetchMetrics {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 let data = self.data.lock().unwrap();
85 if data.is_empty() {
86 return write!(f, "{{}}");
87 }
88
89 let ParquetFetchMetricsData {
90 page_cache_hit,
91 write_cache_hit,
92 cache_miss,
93 pages_to_fetch_mem,
94 page_size_to_fetch_mem,
95 pages_to_fetch_write_cache,
96 page_size_to_fetch_write_cache,
97 pages_to_fetch_store,
98 page_size_to_fetch_store,
99 page_size_needed,
100 write_cache_fetch_elapsed,
101 store_fetch_elapsed,
102 total_fetch_elapsed,
103 } = *data;
104
105 write!(f, "{{")?;
106
107 write!(f, "\"total_fetch_elapsed\":\"{:?}\"", total_fetch_elapsed)?;
108
109 if page_cache_hit > 0 {
110 write!(f, ", \"page_cache_hit\":{}", page_cache_hit)?;
111 }
112 if write_cache_hit > 0 {
113 write!(f, ", \"write_cache_hit\":{}", write_cache_hit)?;
114 }
115 if cache_miss > 0 {
116 write!(f, ", \"cache_miss\":{}", cache_miss)?;
117 }
118 if pages_to_fetch_mem > 0 {
119 write!(f, ", \"pages_to_fetch_mem\":{}", pages_to_fetch_mem)?;
120 }
121 if page_size_to_fetch_mem > 0 {
122 write!(f, ", \"page_size_to_fetch_mem\":{}", page_size_to_fetch_mem)?;
123 }
124 if pages_to_fetch_write_cache > 0 {
125 write!(
126 f,
127 ", \"pages_to_fetch_write_cache\":{}",
128 pages_to_fetch_write_cache
129 )?;
130 }
131 if page_size_to_fetch_write_cache > 0 {
132 write!(
133 f,
134 ", \"page_size_to_fetch_write_cache\":{}",
135 page_size_to_fetch_write_cache
136 )?;
137 }
138 if pages_to_fetch_store > 0 {
139 write!(f, ", \"pages_to_fetch_store\":{}", pages_to_fetch_store)?;
140 }
141 if page_size_to_fetch_store > 0 {
142 write!(
143 f,
144 ", \"page_size_to_fetch_store\":{}",
145 page_size_to_fetch_store
146 )?;
147 }
148 if page_size_needed > 0 {
149 write!(f, ", \"page_size_needed\":{}", page_size_needed)?;
150 }
151 if !write_cache_fetch_elapsed.is_zero() {
152 write!(
153 f,
154 ", \"write_cache_fetch_elapsed\":\"{:?}\"",
155 write_cache_fetch_elapsed
156 )?;
157 }
158 if !store_fetch_elapsed.is_zero() {
159 write!(f, ", \"store_fetch_elapsed\":\"{:?}\"", store_fetch_elapsed)?;
160 }
161
162 write!(f, "}}")
163 }
164}
165
166impl ParquetFetchMetrics {
167 pub fn is_empty(&self) -> bool {
169 self.data.lock().unwrap().is_empty()
170 }
171
172 pub fn merge_from(&self, other: &ParquetFetchMetrics) {
174 let ParquetFetchMetricsData {
175 page_cache_hit,
176 write_cache_hit,
177 cache_miss,
178 pages_to_fetch_mem,
179 page_size_to_fetch_mem,
180 pages_to_fetch_write_cache,
181 page_size_to_fetch_write_cache,
182 pages_to_fetch_store,
183 page_size_to_fetch_store,
184 page_size_needed,
185 write_cache_fetch_elapsed,
186 store_fetch_elapsed,
187 total_fetch_elapsed,
188 } = *other.data.lock().unwrap();
189
190 let mut data = self.data.lock().unwrap();
191 data.page_cache_hit += page_cache_hit;
192 data.write_cache_hit += write_cache_hit;
193 data.cache_miss += cache_miss;
194 data.pages_to_fetch_mem += pages_to_fetch_mem;
195 data.page_size_to_fetch_mem += page_size_to_fetch_mem;
196 data.pages_to_fetch_write_cache += pages_to_fetch_write_cache;
197 data.page_size_to_fetch_write_cache += page_size_to_fetch_write_cache;
198 data.pages_to_fetch_store += pages_to_fetch_store;
199 data.page_size_to_fetch_store += page_size_to_fetch_store;
200 data.page_size_needed += page_size_needed;
201 data.write_cache_fetch_elapsed += write_cache_fetch_elapsed;
202 data.store_fetch_elapsed += store_fetch_elapsed;
203 data.total_fetch_elapsed += total_fetch_elapsed;
204 }
205}
206
207pub(crate) struct RowGroupBase<'a> {
208 metadata: &'a RowGroupMetaData,
209 pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
210 column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
212 pub(crate) row_count: usize,
213}
214
215impl<'a> RowGroupBase<'a> {
216 pub(crate) fn new(parquet_meta: &'a ParquetMetaData, row_group_idx: usize) -> Self {
217 let metadata = parquet_meta.row_group(row_group_idx);
218 let offset_index = parquet_meta
222 .offset_index()
223 .filter(|index| !index.is_empty())
225 .map(|x| x[row_group_idx].as_slice());
226
227 Self {
228 metadata,
229 offset_index,
230 column_chunks: vec![None; metadata.columns().len()],
231 row_count: metadata.num_rows() as usize,
232 }
233 }
234
235 pub(crate) fn calc_sparse_read_ranges(
236 &self,
237 projection: &ProjectionMask,
238 offset_index: &[OffsetIndexMetaData],
239 selection: &RowSelection,
240 ) -> (Vec<Range<u64>>, Vec<Vec<usize>>) {
241 let mut page_start_offsets: Vec<Vec<usize>> = vec![];
244 let ranges = self
245 .column_chunks
246 .iter()
247 .zip(self.metadata.columns())
248 .enumerate()
249 .filter(|&(idx, (chunk, _chunk_meta))| chunk.is_none() && projection.leaf_included(idx))
250 .flat_map(|(idx, (_chunk, chunk_meta))| {
251 let mut ranges = vec![];
254 let (start, _len) = chunk_meta.byte_range();
255 match offset_index[idx].page_locations.first() {
256 Some(first) if first.offset as u64 != start => {
257 ranges.push(start..first.offset as u64);
258 }
259 _ => (),
260 }
261
262 ranges.extend(
263 selection
264 .scan_ranges(&offset_index[idx].page_locations)
265 .iter()
266 .map(|range| range.start..range.end),
267 );
268 page_start_offsets.push(ranges.iter().map(|range| range.start as usize).collect());
269
270 ranges
271 })
272 .collect::<Vec<_>>();
273 (ranges, page_start_offsets)
274 }
275
276 pub(crate) fn assign_sparse_chunk(
277 &mut self,
278 projection: &ProjectionMask,
279 data: Vec<Bytes>,
280 page_start_offsets: Vec<Vec<usize>>,
281 ) {
282 let mut page_start_offsets = page_start_offsets.into_iter();
283 let mut chunk_data = data.into_iter();
284
285 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
286 if chunk.is_some() || !projection.leaf_included(idx) {
287 continue;
288 }
289
290 if let Some(offsets) = page_start_offsets.next() {
291 let mut chunks = Vec::with_capacity(offsets.len());
292 for _ in 0..offsets.len() {
293 chunks.push(chunk_data.next().unwrap());
294 }
295
296 *chunk = Some(Arc::new(ColumnChunkData::Sparse {
297 length: self.metadata.column(idx).byte_range().1 as usize,
298 data: offsets.into_iter().zip(chunks).collect(),
299 }))
300 }
301 }
302 }
303
304 pub(crate) fn calc_dense_read_ranges(&self, projection: &ProjectionMask) -> Vec<Range<u64>> {
305 self.column_chunks
306 .iter()
307 .enumerate()
308 .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
309 .map(|(idx, _chunk)| {
310 let column = self.metadata.column(idx);
311 let (start, length) = column.byte_range();
312 start..(start + length)
313 })
314 .collect::<Vec<_>>()
315 }
316
317 pub(crate) fn assign_dense_chunk(
320 &mut self,
321 projection: &ProjectionMask,
322 chunk_data: Vec<Bytes>,
323 ) {
324 let mut chunk_data = chunk_data.into_iter();
325
326 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
327 if chunk.is_some() || !projection.leaf_included(idx) {
328 continue;
329 }
330
331 let Some(data) = chunk_data.next() else {
333 continue;
334 };
335
336 let column = self.metadata.column(idx);
337 *chunk = Some(Arc::new(ColumnChunkData::Dense {
338 offset: column.byte_range().0 as usize,
339 data,
340 }));
341 }
342 }
343
344 pub(crate) fn column_reader(
346 &self,
347 col_idx: usize,
348 ) -> Result<SerializedPageReader<ColumnChunkData>> {
349 let page_reader = match &self.column_chunks[col_idx] {
350 None => {
351 return Err(ParquetError::General(format!(
352 "Invalid column index {col_idx}, column was not fetched"
353 )));
354 }
355 Some(data) => {
356 let page_locations = self
357 .offset_index
358 .filter(|index| !index.is_empty())
360 .map(|index| index[col_idx].page_locations.clone());
361 SerializedPageReader::new(
362 data.clone(),
363 self.metadata.column(col_idx),
364 self.row_count,
365 page_locations,
366 )?
367 }
368 };
369
370 Ok(page_reader)
371 }
372}
373
374pub struct InMemoryRowGroup<'a> {
376 region_id: RegionId,
377 file_id: FileId,
378 row_group_idx: usize,
379 cache_strategy: CacheStrategy,
380 file_path: &'a str,
381 object_store: ObjectStore,
383 base: RowGroupBase<'a>,
384}
385
386impl<'a> InMemoryRowGroup<'a> {
387 pub fn create(
392 region_id: RegionId,
393 file_id: FileId,
394 parquet_meta: &'a ParquetMetaData,
395 row_group_idx: usize,
396 cache_strategy: CacheStrategy,
397 file_path: &'a str,
398 object_store: ObjectStore,
399 ) -> Self {
400 Self {
401 region_id,
402 file_id,
403 row_group_idx,
404 cache_strategy,
405 file_path,
406 object_store,
407 base: RowGroupBase::new(parquet_meta, row_group_idx),
408 }
409 }
410
411 pub async fn fetch(
413 &mut self,
414 projection: &ProjectionMask,
415 selection: Option<&RowSelection>,
416 metrics: Option<&ParquetFetchMetrics>,
417 ) -> Result<()> {
418 if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
419 let (fetch_ranges, page_start_offsets) =
420 self.base
421 .calc_sparse_read_ranges(projection, offset_index, selection);
422
423 let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?;
424 self.base
426 .assign_sparse_chunk(projection, chunk_data, page_start_offsets);
427 } else {
428 yield_now().await;
431
432 let fetch_ranges = self.base.calc_dense_read_ranges(projection);
434
435 if fetch_ranges.is_empty() {
436 return Ok(());
438 }
439
440 let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?;
442
443 self.base.assign_dense_chunk(projection, chunk_data);
445 }
446
447 Ok(())
448 }
449
450 async fn fetch_bytes(
453 &self,
454 ranges: &[Range<u64>],
455 metrics: Option<&ParquetFetchMetrics>,
456 ) -> Result<Vec<Bytes>> {
457 let _timer = READ_STAGE_FETCH_PAGES.start_timer();
459
460 let page_key = PageKey::new(self.file_id, self.row_group_idx, ranges.to_vec());
461 if let Some(pages) = self.cache_strategy.get_pages(&page_key) {
462 if let Some(metrics) = metrics {
463 let total_size: u64 = ranges.iter().map(|r| r.end - r.start).sum();
464 let mut metrics_data = metrics.data.lock().unwrap();
465 metrics_data.page_cache_hit += 1;
466 metrics_data.pages_to_fetch_mem += ranges.len();
467 metrics_data.page_size_to_fetch_mem += total_size;
468 metrics_data.page_size_needed += total_size;
469 }
470 return Ok(pages.compressed.clone());
471 }
472
473 let (total_range_size, unaligned_size) = compute_total_range_size(ranges);
475
476 let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet);
477 let fetch_write_cache_start = metrics.map(|_| std::time::Instant::now());
478 let write_cache_result = self.fetch_ranges_from_write_cache(key, ranges).await;
479 let pages = match write_cache_result {
480 Some(data) => {
481 if let Some(metrics) = metrics {
482 let elapsed = fetch_write_cache_start
483 .map(|start| start.elapsed())
484 .unwrap_or_default();
485 let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
486 let mut metrics_data = metrics.data.lock().unwrap();
487 metrics_data.write_cache_fetch_elapsed += elapsed;
488 metrics_data.write_cache_hit += 1;
489 metrics_data.pages_to_fetch_write_cache += ranges.len();
490 metrics_data.page_size_to_fetch_write_cache += unaligned_size;
491 metrics_data.page_size_needed += range_size_needed;
492 }
493 data
494 }
495 None => {
496 let _timer = READ_STAGE_ELAPSED
498 .with_label_values(&["cache_miss_read"])
499 .start_timer();
500
501 let start = metrics.map(|_| std::time::Instant::now());
502 let data = fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges)
503 .await
504 .map_err(|e| ParquetError::External(Box::new(e)))?;
505 if let Some(metrics) = metrics {
506 let elapsed = start.map(|start| start.elapsed()).unwrap_or_default();
507 let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
508 let mut metrics_data = metrics.data.lock().unwrap();
509 metrics_data.store_fetch_elapsed += elapsed;
510 metrics_data.cache_miss += 1;
511 metrics_data.pages_to_fetch_store += ranges.len();
512 metrics_data.page_size_to_fetch_store += unaligned_size;
513 metrics_data.page_size_needed += range_size_needed;
514 }
515 data
516 }
517 };
518
519 let page_value = PageValue::new(pages.clone(), total_range_size);
521 self.cache_strategy
522 .put_pages(page_key, Arc::new(page_value));
523
524 Ok(pages)
525 }
526
527 async fn fetch_ranges_from_write_cache(
530 &self,
531 key: IndexKey,
532 ranges: &[Range<u64>],
533 ) -> Option<Vec<Bytes>> {
534 if let Some(cache) = self.cache_strategy.write_cache() {
535 return cache.file_cache().read_ranges(key, ranges).await;
536 }
537 None
538 }
539}
540
541fn compute_total_range_size(ranges: &[Range<u64>]) -> (u64, u64) {
547 if ranges.is_empty() {
548 return (0, 0);
549 }
550
551 let gap = MERGE_GAP as u64;
552 let mut sorted_ranges = ranges.to_vec();
553 sorted_ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start));
554
555 let mut total_size_aligned = 0;
556 let mut total_size_unaligned = 0;
557 let mut cur = sorted_ranges[0].clone();
558
559 for range in sorted_ranges.into_iter().skip(1) {
560 if range.start <= cur.end + gap {
561 cur.end = cur.end.max(range.end);
563 } else {
564 let range_size = cur.end - cur.start;
566 total_size_aligned += align_to_pooled_buf_size(range_size);
567 total_size_unaligned += range_size;
568 cur = range;
569 }
570 }
571
572 let range_size = cur.end - cur.start;
574 total_size_aligned += align_to_pooled_buf_size(range_size);
575 total_size_unaligned += range_size;
576
577 (total_size_aligned, total_size_unaligned)
578}
579
580fn align_to_pooled_buf_size(size: u64) -> u64 {
585 const POOLED_BUF_SIZE: u64 = 2 * 1024 * 1024;
586 size.div_ceil(POOLED_BUF_SIZE) * POOLED_BUF_SIZE
587}
588
589impl RowGroups for InMemoryRowGroup<'_> {
590 fn num_rows(&self) -> usize {
591 self.base.row_count
592 }
593
594 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
595 let page_reader = self.base.column_reader(i)?;
597
598 Ok(Box::new(ColumnChunkIterator {
599 reader: Some(Ok(Box::new(page_reader))),
600 }))
601 }
602}
603
604#[derive(Clone)]
606pub(crate) enum ColumnChunkData {
607 Sparse {
609 length: usize,
611 data: Vec<(usize, Bytes)>,
614 },
615 Dense { offset: usize, data: Bytes },
617}
618
619impl ColumnChunkData {
620 fn get(&self, start: u64) -> Result<Bytes> {
621 match &self {
622 ColumnChunkData::Sparse { data, .. } => data
623 .binary_search_by_key(&start, |(offset, _)| *offset as u64)
624 .map(|idx| data[idx].1.clone())
625 .map_err(|_| {
626 ParquetError::General(format!(
627 "Invalid offset in sparse column chunk data: {start}"
628 ))
629 }),
630 ColumnChunkData::Dense { offset, data } => {
631 let start = start as usize - *offset;
632 Ok(data.slice(start..))
633 }
634 }
635 }
636}
637
638impl Length for ColumnChunkData {
639 fn len(&self) -> u64 {
640 match &self {
641 ColumnChunkData::Sparse { length, .. } => *length as u64,
642 ColumnChunkData::Dense { data, .. } => data.len() as u64,
643 }
644 }
645}
646
647impl ChunkReader for ColumnChunkData {
648 type T = bytes::buf::Reader<Bytes>;
649
650 fn get_read(&self, start: u64) -> Result<Self::T> {
651 Ok(self.get(start)?.reader())
652 }
653
654 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
655 Ok(self.get(start)?.slice(..length))
656 }
657}
658
659pub(crate) struct ColumnChunkIterator {
661 pub(crate) reader: Option<Result<Box<dyn PageReader>>>,
662}
663
664impl Iterator for ColumnChunkIterator {
665 type Item = Result<Box<dyn PageReader>>;
666
667 fn next(&mut self) -> Option<Self::Item> {
668 self.reader.take()
669 }
670}
671
672impl PageIterator for ColumnChunkIterator {}