1use std::ops::Range;
18use std::sync::Arc;
19
20use bytes::{Buf, Bytes};
21use object_store::ObjectStore;
22use parquet::arrow::ProjectionMask;
23use parquet::arrow::arrow_reader::{RowGroups, RowSelection};
24use parquet::column::page::{PageIterator, PageReader};
25use parquet::errors::{ParquetError, Result};
26use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
27use parquet::file::page_index::offset_index::OffsetIndexMetaData;
28use parquet::file::reader::{ChunkReader, Length};
29use parquet::file::serialized_reader::SerializedPageReader;
30use store_api::storage::{FileId, RegionId};
31use tokio::task::yield_now;
32
33use crate::cache::file_cache::{FileType, IndexKey};
34use crate::cache::{CacheStrategy, PageKey, PageValue};
35use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
36use crate::sst::parquet::helper::{MERGE_GAP, fetch_byte_ranges};
37
38#[derive(Default, Debug, Clone)]
40pub struct ParquetFetchMetricsData {
41 pub page_cache_hit: usize,
43 pub write_cache_hit: usize,
45 pub cache_miss: usize,
47 pub pages_to_fetch_mem: usize,
49 pub page_size_to_fetch_mem: u64,
51 pub pages_to_fetch_write_cache: usize,
53 pub page_size_to_fetch_write_cache: u64,
55 pub pages_to_fetch_store: usize,
57 pub page_size_to_fetch_store: u64,
59 pub page_size_needed: u64,
61 pub write_cache_fetch_elapsed: std::time::Duration,
63 pub store_fetch_elapsed: std::time::Duration,
65 pub total_fetch_elapsed: std::time::Duration,
67}
68
69impl ParquetFetchMetricsData {
70 fn is_empty(&self) -> bool {
72 self.total_fetch_elapsed.is_zero()
73 }
74}
75
76#[derive(Default)]
78pub struct ParquetFetchMetrics {
79 pub data: std::sync::Mutex<ParquetFetchMetricsData>,
80}
81
82impl std::fmt::Debug for ParquetFetchMetrics {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 let data = self.data.lock().unwrap();
85 if data.is_empty() {
86 return write!(f, "{{}}");
87 }
88
89 let ParquetFetchMetricsData {
90 page_cache_hit,
91 write_cache_hit,
92 cache_miss,
93 pages_to_fetch_mem,
94 page_size_to_fetch_mem,
95 pages_to_fetch_write_cache,
96 page_size_to_fetch_write_cache,
97 pages_to_fetch_store,
98 page_size_to_fetch_store,
99 page_size_needed,
100 write_cache_fetch_elapsed,
101 store_fetch_elapsed,
102 total_fetch_elapsed,
103 } = *data;
104
105 write!(f, "{{")?;
106
107 write!(f, "\"total_fetch_elapsed\":\"{:?}\"", total_fetch_elapsed)?;
108
109 if page_cache_hit > 0 {
110 write!(f, ", \"page_cache_hit\":{}", page_cache_hit)?;
111 }
112 if write_cache_hit > 0 {
113 write!(f, ", \"write_cache_hit\":{}", write_cache_hit)?;
114 }
115 if cache_miss > 0 {
116 write!(f, ", \"cache_miss\":{}", cache_miss)?;
117 }
118 if pages_to_fetch_mem > 0 {
119 write!(f, ", \"pages_to_fetch_mem\":{}", pages_to_fetch_mem)?;
120 }
121 if page_size_to_fetch_mem > 0 {
122 write!(f, ", \"page_size_to_fetch_mem\":{}", page_size_to_fetch_mem)?;
123 }
124 if pages_to_fetch_write_cache > 0 {
125 write!(
126 f,
127 ", \"pages_to_fetch_write_cache\":{}",
128 pages_to_fetch_write_cache
129 )?;
130 }
131 if page_size_to_fetch_write_cache > 0 {
132 write!(
133 f,
134 ", \"page_size_to_fetch_write_cache\":{}",
135 page_size_to_fetch_write_cache
136 )?;
137 }
138 if pages_to_fetch_store > 0 {
139 write!(f, ", \"pages_to_fetch_store\":{}", pages_to_fetch_store)?;
140 }
141 if page_size_to_fetch_store > 0 {
142 write!(
143 f,
144 ", \"page_size_to_fetch_store\":{}",
145 page_size_to_fetch_store
146 )?;
147 }
148 if page_size_needed > 0 {
149 write!(f, ", \"page_size_needed\":{}", page_size_needed)?;
150 }
151 if !write_cache_fetch_elapsed.is_zero() {
152 write!(
153 f,
154 ", \"write_cache_fetch_elapsed\":\"{:?}\"",
155 write_cache_fetch_elapsed
156 )?;
157 }
158 if !store_fetch_elapsed.is_zero() {
159 write!(f, ", \"store_fetch_elapsed\":\"{:?}\"", store_fetch_elapsed)?;
160 }
161
162 write!(f, "}}")
163 }
164}
165
166impl ParquetFetchMetrics {
167 pub fn is_empty(&self) -> bool {
169 self.data.lock().unwrap().is_empty()
170 }
171
172 pub fn merge_from(&self, other: &ParquetFetchMetrics) {
174 let ParquetFetchMetricsData {
175 page_cache_hit,
176 write_cache_hit,
177 cache_miss,
178 pages_to_fetch_mem,
179 page_size_to_fetch_mem,
180 pages_to_fetch_write_cache,
181 page_size_to_fetch_write_cache,
182 pages_to_fetch_store,
183 page_size_to_fetch_store,
184 page_size_needed,
185 write_cache_fetch_elapsed,
186 store_fetch_elapsed,
187 total_fetch_elapsed,
188 } = *other.data.lock().unwrap();
189
190 let mut data = self.data.lock().unwrap();
191 data.page_cache_hit += page_cache_hit;
192 data.write_cache_hit += write_cache_hit;
193 data.cache_miss += cache_miss;
194 data.pages_to_fetch_mem += pages_to_fetch_mem;
195 data.page_size_to_fetch_mem += page_size_to_fetch_mem;
196 data.pages_to_fetch_write_cache += pages_to_fetch_write_cache;
197 data.page_size_to_fetch_write_cache += page_size_to_fetch_write_cache;
198 data.pages_to_fetch_store += pages_to_fetch_store;
199 data.page_size_to_fetch_store += page_size_to_fetch_store;
200 data.page_size_needed += page_size_needed;
201 data.write_cache_fetch_elapsed += write_cache_fetch_elapsed;
202 data.store_fetch_elapsed += store_fetch_elapsed;
203 data.total_fetch_elapsed += total_fetch_elapsed;
204 }
205}
206
207pub(crate) struct RowGroupBase<'a> {
208 parquet_metadata: &'a ParquetMetaData,
209 row_group_idx: usize,
210 pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
211 column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
213 pub(crate) row_count: usize,
214}
215
216impl<'a> RowGroupBase<'a> {
217 pub(crate) fn new(parquet_meta: &'a ParquetMetaData, row_group_idx: usize) -> Self {
218 let metadata = parquet_meta.row_group(row_group_idx);
219 let offset_index = parquet_meta
223 .offset_index()
224 .filter(|index| !index.is_empty())
226 .map(|x| x[row_group_idx].as_slice());
227
228 Self {
229 parquet_metadata: parquet_meta,
230 row_group_idx,
231 offset_index,
232 column_chunks: vec![None; metadata.columns().len()],
233 row_count: metadata.num_rows() as usize,
234 }
235 }
236
237 pub(crate) fn calc_sparse_read_ranges(
238 &self,
239 projection: &ProjectionMask,
240 offset_index: &[OffsetIndexMetaData],
241 selection: &RowSelection,
242 ) -> (Vec<Range<u64>>, Vec<Vec<usize>>) {
243 let mut page_start_offsets: Vec<Vec<usize>> = vec![];
246 let ranges = self
247 .column_chunks
248 .iter()
249 .zip(self.row_group_metadata().columns())
250 .enumerate()
251 .filter(|&(idx, (chunk, _chunk_meta))| chunk.is_none() && projection.leaf_included(idx))
252 .flat_map(|(idx, (_chunk, chunk_meta))| {
253 let mut ranges = vec![];
256 let (start, _len) = chunk_meta.byte_range();
257 match offset_index[idx].page_locations.first() {
258 Some(first) if first.offset as u64 != start => {
259 ranges.push(start..first.offset as u64);
260 }
261 _ => (),
262 }
263
264 ranges.extend(
265 selection
266 .scan_ranges(&offset_index[idx].page_locations)
267 .iter()
268 .map(|range| range.start..range.end),
269 );
270 page_start_offsets.push(ranges.iter().map(|range| range.start as usize).collect());
271
272 ranges
273 })
274 .collect::<Vec<_>>();
275 (ranges, page_start_offsets)
276 }
277
278 pub(crate) fn assign_sparse_chunk(
279 &mut self,
280 projection: &ProjectionMask,
281 data: Vec<Bytes>,
282 page_start_offsets: Vec<Vec<usize>>,
283 ) {
284 let mut page_start_offsets = page_start_offsets.into_iter();
285 let mut chunk_data = data.into_iter();
286
287 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
288 if chunk.is_some() || !projection.leaf_included(idx) {
289 continue;
290 }
291
292 if let Some(offsets) = page_start_offsets.next() {
293 let mut chunks = Vec::with_capacity(offsets.len());
294 for _ in 0..offsets.len() {
295 chunks.push(chunk_data.next().unwrap());
296 }
297
298 let column = self
299 .parquet_metadata
300 .row_group(self.row_group_idx)
301 .column(idx);
302 *chunk = Some(Arc::new(ColumnChunkData::Sparse {
303 length: column.byte_range().1 as usize,
304 data: offsets.into_iter().zip(chunks).collect(),
305 }))
306 }
307 }
308 }
309
310 pub(crate) fn calc_dense_read_ranges(&self, projection: &ProjectionMask) -> Vec<Range<u64>> {
311 self.column_chunks
312 .iter()
313 .enumerate()
314 .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
315 .map(|(idx, _chunk)| {
316 let column = self.row_group_metadata().column(idx);
317 let (start, length) = column.byte_range();
318 start..(start + length)
319 })
320 .collect::<Vec<_>>()
321 }
322
323 pub(crate) fn assign_dense_chunk(
326 &mut self,
327 projection: &ProjectionMask,
328 chunk_data: Vec<Bytes>,
329 ) {
330 let mut chunk_data = chunk_data.into_iter();
331
332 for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
333 if chunk.is_some() || !projection.leaf_included(idx) {
334 continue;
335 }
336
337 let Some(data) = chunk_data.next() else {
339 continue;
340 };
341
342 let column = self
343 .parquet_metadata
344 .row_group(self.row_group_idx)
345 .column(idx);
346 *chunk = Some(Arc::new(ColumnChunkData::Dense {
347 offset: column.byte_range().0 as usize,
348 data,
349 }));
350 }
351 }
352
353 pub(crate) fn column_reader(
355 &self,
356 col_idx: usize,
357 ) -> Result<SerializedPageReader<ColumnChunkData>> {
358 let page_reader = match &self.column_chunks[col_idx] {
359 None => {
360 return Err(ParquetError::General(format!(
361 "Invalid column index {col_idx}, column was not fetched"
362 )));
363 }
364 Some(data) => {
365 let page_locations = self
366 .offset_index
367 .filter(|index| !index.is_empty())
369 .map(|index| index[col_idx].page_locations.clone());
370 SerializedPageReader::new(
371 data.clone(),
372 self.row_group_metadata().column(col_idx),
373 self.row_count,
374 page_locations,
375 )?
376 }
377 };
378
379 Ok(page_reader)
380 }
381
382 pub(crate) fn parquet_metadata(&self) -> &ParquetMetaData {
383 self.parquet_metadata
384 }
385
386 pub(crate) fn row_group_metadata(&self) -> &RowGroupMetaData {
387 self.parquet_metadata().row_group(self.row_group_idx)
388 }
389}
390
391pub struct InMemoryRowGroup<'a> {
393 region_id: RegionId,
394 file_id: FileId,
395 row_group_idx: usize,
396 cache_strategy: CacheStrategy,
397 file_path: &'a str,
398 object_store: ObjectStore,
400 base: RowGroupBase<'a>,
401}
402
403impl<'a> InMemoryRowGroup<'a> {
404 pub fn create(
409 region_id: RegionId,
410 file_id: FileId,
411 parquet_meta: &'a ParquetMetaData,
412 row_group_idx: usize,
413 cache_strategy: CacheStrategy,
414 file_path: &'a str,
415 object_store: ObjectStore,
416 ) -> Self {
417 Self {
418 region_id,
419 file_id,
420 row_group_idx,
421 cache_strategy,
422 file_path,
423 object_store,
424 base: RowGroupBase::new(parquet_meta, row_group_idx),
425 }
426 }
427
428 pub async fn fetch(
430 &mut self,
431 projection: &ProjectionMask,
432 selection: Option<&RowSelection>,
433 metrics: Option<&ParquetFetchMetrics>,
434 ) -> Result<()> {
435 if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
436 let (fetch_ranges, page_start_offsets) =
437 self.base
438 .calc_sparse_read_ranges(projection, offset_index, selection);
439
440 let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?;
441 self.base
443 .assign_sparse_chunk(projection, chunk_data, page_start_offsets);
444 } else {
445 yield_now().await;
448
449 let fetch_ranges = self.base.calc_dense_read_ranges(projection);
451
452 if fetch_ranges.is_empty() {
453 return Ok(());
455 }
456
457 let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?;
459
460 self.base.assign_dense_chunk(projection, chunk_data);
462 }
463
464 Ok(())
465 }
466
467 async fn fetch_bytes(
470 &self,
471 ranges: &[Range<u64>],
472 metrics: Option<&ParquetFetchMetrics>,
473 ) -> Result<Vec<Bytes>> {
474 let _timer = READ_STAGE_FETCH_PAGES.start_timer();
476
477 let page_key = PageKey::new(self.file_id, self.row_group_idx, ranges.to_vec());
478 if let Some(pages) = self.cache_strategy.get_pages(&page_key) {
479 if let Some(metrics) = metrics {
480 let total_size: u64 = ranges.iter().map(|r| r.end - r.start).sum();
481 let mut metrics_data = metrics.data.lock().unwrap();
482 metrics_data.page_cache_hit += 1;
483 metrics_data.pages_to_fetch_mem += ranges.len();
484 metrics_data.page_size_to_fetch_mem += total_size;
485 metrics_data.page_size_needed += total_size;
486 }
487 return Ok(pages.compressed.clone());
488 }
489
490 let (total_range_size, unaligned_size) = compute_total_range_size(ranges);
492
493 let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet);
494 let fetch_write_cache_start = metrics.map(|_| std::time::Instant::now());
495 let write_cache_result = self.fetch_ranges_from_write_cache(key, ranges).await;
496 let pages = match write_cache_result {
497 Some(data) => {
498 if let Some(metrics) = metrics {
499 let elapsed = fetch_write_cache_start
500 .map(|start| start.elapsed())
501 .unwrap_or_default();
502 let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
503 let mut metrics_data = metrics.data.lock().unwrap();
504 metrics_data.write_cache_fetch_elapsed += elapsed;
505 metrics_data.write_cache_hit += 1;
506 metrics_data.pages_to_fetch_write_cache += ranges.len();
507 metrics_data.page_size_to_fetch_write_cache += unaligned_size;
508 metrics_data.page_size_needed += range_size_needed;
509 }
510 data
511 }
512 None => {
513 let _timer = READ_STAGE_ELAPSED
515 .with_label_values(&["cache_miss_read"])
516 .start_timer();
517
518 let start = metrics.map(|_| std::time::Instant::now());
519 let data = fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges)
520 .await
521 .map_err(|e| ParquetError::External(Box::new(e)))?;
522 if let Some(metrics) = metrics {
523 let elapsed = start.map(|start| start.elapsed()).unwrap_or_default();
524 let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
525 let mut metrics_data = metrics.data.lock().unwrap();
526 metrics_data.store_fetch_elapsed += elapsed;
527 metrics_data.cache_miss += 1;
528 metrics_data.pages_to_fetch_store += ranges.len();
529 metrics_data.page_size_to_fetch_store += unaligned_size;
530 metrics_data.page_size_needed += range_size_needed;
531 }
532 data
533 }
534 };
535
536 let page_value = PageValue::new(pages.clone(), total_range_size);
538 self.cache_strategy
539 .put_pages(page_key, Arc::new(page_value));
540
541 Ok(pages)
542 }
543
544 async fn fetch_ranges_from_write_cache(
547 &self,
548 key: IndexKey,
549 ranges: &[Range<u64>],
550 ) -> Option<Vec<Bytes>> {
551 if let Some(cache) = self.cache_strategy.write_cache() {
552 return cache.file_cache().read_ranges(key, ranges).await;
553 }
554 None
555 }
556}
557
558fn compute_total_range_size(ranges: &[Range<u64>]) -> (u64, u64) {
564 if ranges.is_empty() {
565 return (0, 0);
566 }
567
568 let gap = MERGE_GAP as u64;
569 let mut sorted_ranges = ranges.to_vec();
570 sorted_ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start));
571
572 let mut total_size_aligned = 0;
573 let mut total_size_unaligned = 0;
574 let mut cur = sorted_ranges[0].clone();
575
576 for range in sorted_ranges.into_iter().skip(1) {
577 if range.start <= cur.end + gap {
578 cur.end = cur.end.max(range.end);
580 } else {
581 let range_size = cur.end - cur.start;
583 total_size_aligned += align_to_pooled_buf_size(range_size);
584 total_size_unaligned += range_size;
585 cur = range;
586 }
587 }
588
589 let range_size = cur.end - cur.start;
591 total_size_aligned += align_to_pooled_buf_size(range_size);
592 total_size_unaligned += range_size;
593
594 (total_size_aligned, total_size_unaligned)
595}
596
597fn align_to_pooled_buf_size(size: u64) -> u64 {
602 const POOLED_BUF_SIZE: u64 = 2 * 1024 * 1024;
603 size.div_ceil(POOLED_BUF_SIZE) * POOLED_BUF_SIZE
604}
605
606impl RowGroups for InMemoryRowGroup<'_> {
607 fn num_rows(&self) -> usize {
608 self.base.row_count
609 }
610
611 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
612 let page_reader = self.base.column_reader(i)?;
614
615 Ok(Box::new(ColumnChunkIterator {
616 reader: Some(Ok(Box::new(page_reader))),
617 }))
618 }
619
620 fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
621 Box::new(std::iter::once(self.base.row_group_metadata()))
622 }
623
624 fn metadata(&self) -> &ParquetMetaData {
625 self.base.parquet_metadata()
626 }
627}
628
629#[derive(Clone)]
631pub(crate) enum ColumnChunkData {
632 Sparse {
634 length: usize,
636 data: Vec<(usize, Bytes)>,
639 },
640 Dense { offset: usize, data: Bytes },
642}
643
644impl ColumnChunkData {
645 fn get(&self, start: u64) -> Result<Bytes> {
646 match &self {
647 ColumnChunkData::Sparse { data, .. } => data
648 .binary_search_by_key(&start, |(offset, _)| *offset as u64)
649 .map(|idx| data[idx].1.clone())
650 .map_err(|_| {
651 ParquetError::General(format!(
652 "Invalid offset in sparse column chunk data: {start}"
653 ))
654 }),
655 ColumnChunkData::Dense { offset, data } => {
656 let start = start as usize - *offset;
657 Ok(data.slice(start..))
658 }
659 }
660 }
661}
662
663impl Length for ColumnChunkData {
664 fn len(&self) -> u64 {
665 match &self {
666 ColumnChunkData::Sparse { length, .. } => *length as u64,
667 ColumnChunkData::Dense { data, .. } => data.len() as u64,
668 }
669 }
670}
671
672impl ChunkReader for ColumnChunkData {
673 type T = bytes::buf::Reader<Bytes>;
674
675 fn get_read(&self, start: u64) -> Result<Self::T> {
676 Ok(self.get(start)?.reader())
677 }
678
679 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
680 Ok(self.get(start)?.slice(..length))
681 }
682}
683
684pub(crate) struct ColumnChunkIterator {
686 pub(crate) reader: Option<Result<Box<dyn PageReader>>>,
687}
688
689impl Iterator for ColumnChunkIterator {
690 type Item = Result<Box<dyn PageReader>>;
691
692 fn next(&mut self) -> Option<Self::Item> {
693 self.reader.take()
694 }
695}
696
697impl PageIterator for ColumnChunkIterator {}