mito2/sst/parquet/
row_group.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Ports private structs from [parquet crate](https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/arrow/async_reader/mod.rs#L644-L650).
16
17use std::ops::Range;
18use std::sync::Arc;
19
20use bytes::{Buf, Bytes};
21use object_store::ObjectStore;
22use parquet::arrow::ProjectionMask;
23use parquet::arrow::arrow_reader::{RowGroups, RowSelection};
24use parquet::column::page::{PageIterator, PageReader};
25use parquet::errors::{ParquetError, Result};
26use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
27use parquet::file::page_index::offset_index::OffsetIndexMetaData;
28use parquet::file::reader::{ChunkReader, Length};
29use parquet::file::serialized_reader::SerializedPageReader;
30use store_api::storage::{FileId, RegionId};
31use tokio::task::yield_now;
32
33use crate::cache::file_cache::{FileType, IndexKey};
34use crate::cache::{CacheStrategy, PageKey, PageValue};
35use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
36use crate::sst::parquet::helper::{MERGE_GAP, fetch_byte_ranges};
37
38/// Inner data for ParquetFetchMetrics.
39#[derive(Default, Debug, Clone)]
40pub struct ParquetFetchMetricsData {
41    /// Number of page cache hits.
42    pub page_cache_hit: usize,
43    /// Number of write cache hits.
44    pub write_cache_hit: usize,
45    /// Number of cache misses.
46    pub cache_miss: usize,
47    /// Number of pages to fetch from mem cache.
48    pub pages_to_fetch_mem: usize,
49    /// Total size in bytes of pages to fetch from mem cache.
50    pub page_size_to_fetch_mem: u64,
51    /// Number of pages to fetch from write cache.
52    pub pages_to_fetch_write_cache: usize,
53    /// Total size in bytes of pages to fetch from write cache.
54    pub page_size_to_fetch_write_cache: u64,
55    /// Number of pages to fetch from store.
56    pub pages_to_fetch_store: usize,
57    /// Total size in bytes of pages to fetch from store.
58    pub page_size_to_fetch_store: u64,
59    /// Total size in bytes of pages actually returned.
60    pub page_size_needed: u64,
61    /// Elapsed time fetching from write cache.
62    pub write_cache_fetch_elapsed: std::time::Duration,
63    /// Elapsed time fetching from object store.
64    pub store_fetch_elapsed: std::time::Duration,
65    /// Total elapsed time for fetching row groups.
66    pub total_fetch_elapsed: std::time::Duration,
67}
68
69impl ParquetFetchMetricsData {
70    /// Returns true if the metrics are empty (contain no meaningful data).
71    fn is_empty(&self) -> bool {
72        self.total_fetch_elapsed.is_zero()
73    }
74}
75
76/// Metrics for tracking page/row group fetch operations.
77#[derive(Default)]
78pub struct ParquetFetchMetrics {
79    pub data: std::sync::Mutex<ParquetFetchMetricsData>,
80}
81
82impl std::fmt::Debug for ParquetFetchMetrics {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        let data = self.data.lock().unwrap();
85        if data.is_empty() {
86            return write!(f, "{{}}");
87        }
88
89        let ParquetFetchMetricsData {
90            page_cache_hit,
91            write_cache_hit,
92            cache_miss,
93            pages_to_fetch_mem,
94            page_size_to_fetch_mem,
95            pages_to_fetch_write_cache,
96            page_size_to_fetch_write_cache,
97            pages_to_fetch_store,
98            page_size_to_fetch_store,
99            page_size_needed,
100            write_cache_fetch_elapsed,
101            store_fetch_elapsed,
102            total_fetch_elapsed,
103        } = *data;
104
105        write!(f, "{{")?;
106
107        write!(f, "\"total_fetch_elapsed\":\"{:?}\"", total_fetch_elapsed)?;
108
109        if page_cache_hit > 0 {
110            write!(f, ", \"page_cache_hit\":{}", page_cache_hit)?;
111        }
112        if write_cache_hit > 0 {
113            write!(f, ", \"write_cache_hit\":{}", write_cache_hit)?;
114        }
115        if cache_miss > 0 {
116            write!(f, ", \"cache_miss\":{}", cache_miss)?;
117        }
118        if pages_to_fetch_mem > 0 {
119            write!(f, ", \"pages_to_fetch_mem\":{}", pages_to_fetch_mem)?;
120        }
121        if page_size_to_fetch_mem > 0 {
122            write!(f, ", \"page_size_to_fetch_mem\":{}", page_size_to_fetch_mem)?;
123        }
124        if pages_to_fetch_write_cache > 0 {
125            write!(
126                f,
127                ", \"pages_to_fetch_write_cache\":{}",
128                pages_to_fetch_write_cache
129            )?;
130        }
131        if page_size_to_fetch_write_cache > 0 {
132            write!(
133                f,
134                ", \"page_size_to_fetch_write_cache\":{}",
135                page_size_to_fetch_write_cache
136            )?;
137        }
138        if pages_to_fetch_store > 0 {
139            write!(f, ", \"pages_to_fetch_store\":{}", pages_to_fetch_store)?;
140        }
141        if page_size_to_fetch_store > 0 {
142            write!(
143                f,
144                ", \"page_size_to_fetch_store\":{}",
145                page_size_to_fetch_store
146            )?;
147        }
148        if page_size_needed > 0 {
149            write!(f, ", \"page_size_needed\":{}", page_size_needed)?;
150        }
151        if !write_cache_fetch_elapsed.is_zero() {
152            write!(
153                f,
154                ", \"write_cache_fetch_elapsed\":\"{:?}\"",
155                write_cache_fetch_elapsed
156            )?;
157        }
158        if !store_fetch_elapsed.is_zero() {
159            write!(f, ", \"store_fetch_elapsed\":\"{:?}\"", store_fetch_elapsed)?;
160        }
161
162        write!(f, "}}")
163    }
164}
165
166impl ParquetFetchMetrics {
167    /// Returns true if the metrics are empty (contain no meaningful data).
168    pub fn is_empty(&self) -> bool {
169        self.data.lock().unwrap().is_empty()
170    }
171
172    /// Merges metrics from another [ParquetFetchMetrics].
173    pub fn merge_from(&self, other: &ParquetFetchMetrics) {
174        let ParquetFetchMetricsData {
175            page_cache_hit,
176            write_cache_hit,
177            cache_miss,
178            pages_to_fetch_mem,
179            page_size_to_fetch_mem,
180            pages_to_fetch_write_cache,
181            page_size_to_fetch_write_cache,
182            pages_to_fetch_store,
183            page_size_to_fetch_store,
184            page_size_needed,
185            write_cache_fetch_elapsed,
186            store_fetch_elapsed,
187            total_fetch_elapsed,
188        } = *other.data.lock().unwrap();
189
190        let mut data = self.data.lock().unwrap();
191        data.page_cache_hit += page_cache_hit;
192        data.write_cache_hit += write_cache_hit;
193        data.cache_miss += cache_miss;
194        data.pages_to_fetch_mem += pages_to_fetch_mem;
195        data.page_size_to_fetch_mem += page_size_to_fetch_mem;
196        data.pages_to_fetch_write_cache += pages_to_fetch_write_cache;
197        data.page_size_to_fetch_write_cache += page_size_to_fetch_write_cache;
198        data.pages_to_fetch_store += pages_to_fetch_store;
199        data.page_size_to_fetch_store += page_size_to_fetch_store;
200        data.page_size_needed += page_size_needed;
201        data.write_cache_fetch_elapsed += write_cache_fetch_elapsed;
202        data.store_fetch_elapsed += store_fetch_elapsed;
203        data.total_fetch_elapsed += total_fetch_elapsed;
204    }
205}
206
207pub(crate) struct RowGroupBase<'a> {
208    parquet_metadata: &'a ParquetMetaData,
209    row_group_idx: usize,
210    pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
211    /// Compressed page of each column.
212    column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
213    pub(crate) row_count: usize,
214}
215
216impl<'a> RowGroupBase<'a> {
217    pub(crate) fn new(parquet_meta: &'a ParquetMetaData, row_group_idx: usize) -> Self {
218        let metadata = parquet_meta.row_group(row_group_idx);
219        // `offset_index` is always `None` if we don't set
220        // [with_page_index()](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index)
221        // to `true`.
222        let offset_index = parquet_meta
223            .offset_index()
224            // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
225            .filter(|index| !index.is_empty())
226            .map(|x| x[row_group_idx].as_slice());
227
228        Self {
229            parquet_metadata: parquet_meta,
230            row_group_idx,
231            offset_index,
232            column_chunks: vec![None; metadata.columns().len()],
233            row_count: metadata.num_rows() as usize,
234        }
235    }
236
237    pub(crate) fn calc_sparse_read_ranges(
238        &self,
239        projection: &ProjectionMask,
240        offset_index: &[OffsetIndexMetaData],
241        selection: &RowSelection,
242    ) -> (Vec<Range<u64>>, Vec<Vec<usize>>) {
243        // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
244        // `RowSelection`
245        let mut page_start_offsets: Vec<Vec<usize>> = vec![];
246        let ranges = self
247            .column_chunks
248            .iter()
249            .zip(self.row_group_metadata().columns())
250            .enumerate()
251            .filter(|&(idx, (chunk, _chunk_meta))| chunk.is_none() && projection.leaf_included(idx))
252            .flat_map(|(idx, (_chunk, chunk_meta))| {
253                // If the first page does not start at the beginning of the column,
254                // then we need to also fetch a dictionary page.
255                let mut ranges = vec![];
256                let (start, _len) = chunk_meta.byte_range();
257                match offset_index[idx].page_locations.first() {
258                    Some(first) if first.offset as u64 != start => {
259                        ranges.push(start..first.offset as u64);
260                    }
261                    _ => (),
262                }
263
264                ranges.extend(
265                    selection
266                        .scan_ranges(&offset_index[idx].page_locations)
267                        .iter()
268                        .map(|range| range.start..range.end),
269                );
270                page_start_offsets.push(ranges.iter().map(|range| range.start as usize).collect());
271
272                ranges
273            })
274            .collect::<Vec<_>>();
275        (ranges, page_start_offsets)
276    }
277
278    pub(crate) fn assign_sparse_chunk(
279        &mut self,
280        projection: &ProjectionMask,
281        data: Vec<Bytes>,
282        page_start_offsets: Vec<Vec<usize>>,
283    ) {
284        let mut page_start_offsets = page_start_offsets.into_iter();
285        let mut chunk_data = data.into_iter();
286
287        for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
288            if chunk.is_some() || !projection.leaf_included(idx) {
289                continue;
290            }
291
292            if let Some(offsets) = page_start_offsets.next() {
293                let mut chunks = Vec::with_capacity(offsets.len());
294                for _ in 0..offsets.len() {
295                    chunks.push(chunk_data.next().unwrap());
296                }
297
298                let column = self
299                    .parquet_metadata
300                    .row_group(self.row_group_idx)
301                    .column(idx);
302                *chunk = Some(Arc::new(ColumnChunkData::Sparse {
303                    length: column.byte_range().1 as usize,
304                    data: offsets.into_iter().zip(chunks).collect(),
305                }))
306            }
307        }
308    }
309
310    pub(crate) fn calc_dense_read_ranges(&self, projection: &ProjectionMask) -> Vec<Range<u64>> {
311        self.column_chunks
312            .iter()
313            .enumerate()
314            .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
315            .map(|(idx, _chunk)| {
316                let column = self.row_group_metadata().column(idx);
317                let (start, length) = column.byte_range();
318                start..(start + length)
319            })
320            .collect::<Vec<_>>()
321    }
322
323    /// Assigns compressed chunk binary data to [RowGroupBase::column_chunks]
324    /// and returns the chunk offset and binary data assigned.
325    pub(crate) fn assign_dense_chunk(
326        &mut self,
327        projection: &ProjectionMask,
328        chunk_data: Vec<Bytes>,
329    ) {
330        let mut chunk_data = chunk_data.into_iter();
331
332        for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
333            if chunk.is_some() || !projection.leaf_included(idx) {
334                continue;
335            }
336
337            // Get the fetched page.
338            let Some(data) = chunk_data.next() else {
339                continue;
340            };
341
342            let column = self
343                .parquet_metadata
344                .row_group(self.row_group_idx)
345                .column(idx);
346            *chunk = Some(Arc::new(ColumnChunkData::Dense {
347                offset: column.byte_range().0 as usize,
348                data,
349            }));
350        }
351    }
352
353    /// Create [PageReader] from [RowGroupBase::column_chunks]
354    pub(crate) fn column_reader(
355        &self,
356        col_idx: usize,
357    ) -> Result<SerializedPageReader<ColumnChunkData>> {
358        let page_reader = match &self.column_chunks[col_idx] {
359            None => {
360                return Err(ParquetError::General(format!(
361                    "Invalid column index {col_idx}, column was not fetched"
362                )));
363            }
364            Some(data) => {
365                let page_locations = self
366                    .offset_index
367                    // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
368                    .filter(|index| !index.is_empty())
369                    .map(|index| index[col_idx].page_locations.clone());
370                SerializedPageReader::new(
371                    data.clone(),
372                    self.row_group_metadata().column(col_idx),
373                    self.row_count,
374                    page_locations,
375                )?
376            }
377        };
378
379        Ok(page_reader)
380    }
381
382    pub(crate) fn parquet_metadata(&self) -> &ParquetMetaData {
383        self.parquet_metadata
384    }
385
386    pub(crate) fn row_group_metadata(&self) -> &RowGroupMetaData {
387        self.parquet_metadata().row_group(self.row_group_idx)
388    }
389}
390
391/// An in-memory collection of column chunks
392pub struct InMemoryRowGroup<'a> {
393    region_id: RegionId,
394    file_id: FileId,
395    row_group_idx: usize,
396    cache_strategy: CacheStrategy,
397    file_path: &'a str,
398    /// Object store.
399    object_store: ObjectStore,
400    base: RowGroupBase<'a>,
401}
402
403impl<'a> InMemoryRowGroup<'a> {
404    /// Creates a new [InMemoryRowGroup] by `row_group_idx`.
405    ///
406    /// # Panics
407    /// Panics if the `row_group_idx` is invalid.
408    pub fn create(
409        region_id: RegionId,
410        file_id: FileId,
411        parquet_meta: &'a ParquetMetaData,
412        row_group_idx: usize,
413        cache_strategy: CacheStrategy,
414        file_path: &'a str,
415        object_store: ObjectStore,
416    ) -> Self {
417        Self {
418            region_id,
419            file_id,
420            row_group_idx,
421            cache_strategy,
422            file_path,
423            object_store,
424            base: RowGroupBase::new(parquet_meta, row_group_idx),
425        }
426    }
427
428    /// Fetches the necessary column data into memory
429    pub async fn fetch(
430        &mut self,
431        projection: &ProjectionMask,
432        selection: Option<&RowSelection>,
433        metrics: Option<&ParquetFetchMetrics>,
434    ) -> Result<()> {
435        if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
436            let (fetch_ranges, page_start_offsets) =
437                self.base
438                    .calc_sparse_read_ranges(projection, offset_index, selection);
439
440            let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?;
441            // Assign sparse chunk data to base.
442            self.base
443                .assign_sparse_chunk(projection, chunk_data, page_start_offsets);
444        } else {
445            // Release the CPU to avoid blocking the runtime. Since `fetch_pages_from_cache`
446            // is a synchronous, CPU-bound operation.
447            yield_now().await;
448
449            // Calculate ranges to read.
450            let fetch_ranges = self.base.calc_dense_read_ranges(projection);
451
452            if fetch_ranges.is_empty() {
453                // Nothing to fetch.
454                return Ok(());
455            }
456
457            // Fetch data with ranges
458            let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?;
459
460            // Assigns fetched data to base.
461            self.base.assign_dense_chunk(projection, chunk_data);
462        }
463
464        Ok(())
465    }
466
467    /// Try to fetch data from the memory cache or the WriteCache,
468    /// if not in WriteCache, fetch data from object store directly.
469    async fn fetch_bytes(
470        &self,
471        ranges: &[Range<u64>],
472        metrics: Option<&ParquetFetchMetrics>,
473    ) -> Result<Vec<Bytes>> {
474        // Now fetch page timer includes the whole time to read pages.
475        let _timer = READ_STAGE_FETCH_PAGES.start_timer();
476
477        let page_key = PageKey::new(self.file_id, self.row_group_idx, ranges.to_vec());
478        if let Some(pages) = self.cache_strategy.get_pages(&page_key) {
479            if let Some(metrics) = metrics {
480                let total_size: u64 = ranges.iter().map(|r| r.end - r.start).sum();
481                let mut metrics_data = metrics.data.lock().unwrap();
482                metrics_data.page_cache_hit += 1;
483                metrics_data.pages_to_fetch_mem += ranges.len();
484                metrics_data.page_size_to_fetch_mem += total_size;
485                metrics_data.page_size_needed += total_size;
486            }
487            return Ok(pages.compressed.clone());
488        }
489
490        // Calculate total range size for metrics.
491        let (total_range_size, unaligned_size) = compute_total_range_size(ranges);
492
493        let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet);
494        let fetch_write_cache_start = metrics.map(|_| std::time::Instant::now());
495        let write_cache_result = self.fetch_ranges_from_write_cache(key, ranges).await;
496        let pages = match write_cache_result {
497            Some(data) => {
498                if let Some(metrics) = metrics {
499                    let elapsed = fetch_write_cache_start
500                        .map(|start| start.elapsed())
501                        .unwrap_or_default();
502                    let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
503                    let mut metrics_data = metrics.data.lock().unwrap();
504                    metrics_data.write_cache_fetch_elapsed += elapsed;
505                    metrics_data.write_cache_hit += 1;
506                    metrics_data.pages_to_fetch_write_cache += ranges.len();
507                    metrics_data.page_size_to_fetch_write_cache += unaligned_size;
508                    metrics_data.page_size_needed += range_size_needed;
509                }
510                data
511            }
512            None => {
513                // Fetch data from object store.
514                let _timer = READ_STAGE_ELAPSED
515                    .with_label_values(&["cache_miss_read"])
516                    .start_timer();
517
518                let start = metrics.map(|_| std::time::Instant::now());
519                let data = fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges)
520                    .await
521                    .map_err(|e| ParquetError::External(Box::new(e)))?;
522                if let Some(metrics) = metrics {
523                    let elapsed = start.map(|start| start.elapsed()).unwrap_or_default();
524                    let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
525                    let mut metrics_data = metrics.data.lock().unwrap();
526                    metrics_data.store_fetch_elapsed += elapsed;
527                    metrics_data.cache_miss += 1;
528                    metrics_data.pages_to_fetch_store += ranges.len();
529                    metrics_data.page_size_to_fetch_store += unaligned_size;
530                    metrics_data.page_size_needed += range_size_needed;
531                }
532                data
533            }
534        };
535
536        // Put pages back to the cache.
537        let page_value = PageValue::new(pages.clone(), total_range_size);
538        self.cache_strategy
539            .put_pages(page_key, Arc::new(page_value));
540
541        Ok(pages)
542    }
543
544    /// Fetches data from write cache.
545    /// Returns `None` if the data is not in the cache.
546    async fn fetch_ranges_from_write_cache(
547        &self,
548        key: IndexKey,
549        ranges: &[Range<u64>],
550    ) -> Option<Vec<Bytes>> {
551        if let Some(cache) = self.cache_strategy.write_cache() {
552            return cache.file_cache().read_ranges(key, ranges).await;
553        }
554        None
555    }
556}
557
558/// Computes the max possible buffer size to read the given `ranges`.
559/// Returns (aligned_size, unaligned_size) where:
560/// - aligned_size: total size aligned to pooled buffer size
561/// - unaligned_size: actual total size without alignment
562// See https://github.com/apache/opendal/blob/v0.54.0/core/src/types/read/reader.rs#L166-L192
563fn compute_total_range_size(ranges: &[Range<u64>]) -> (u64, u64) {
564    if ranges.is_empty() {
565        return (0, 0);
566    }
567
568    let gap = MERGE_GAP as u64;
569    let mut sorted_ranges = ranges.to_vec();
570    sorted_ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start));
571
572    let mut total_size_aligned = 0;
573    let mut total_size_unaligned = 0;
574    let mut cur = sorted_ranges[0].clone();
575
576    for range in sorted_ranges.into_iter().skip(1) {
577        if range.start <= cur.end + gap {
578            // There is an overlap or the gap is small enough to merge
579            cur.end = cur.end.max(range.end);
580        } else {
581            // No overlap and the gap is too large, add current range to total and start a new one
582            let range_size = cur.end - cur.start;
583            total_size_aligned += align_to_pooled_buf_size(range_size);
584            total_size_unaligned += range_size;
585            cur = range;
586        }
587    }
588
589    // Add the last range
590    let range_size = cur.end - cur.start;
591    total_size_aligned += align_to_pooled_buf_size(range_size);
592    total_size_unaligned += range_size;
593
594    (total_size_aligned, total_size_unaligned)
595}
596
597/// Aligns the given size to the multiple of the pooled buffer size.
598// See:
599// - https://github.com/apache/opendal/blob/v0.54.0/core/src/services/fs/backend.rs#L178
600// - https://github.com/apache/opendal/blob/v0.54.0/core/src/services/fs/reader.rs#L36-L46
601fn align_to_pooled_buf_size(size: u64) -> u64 {
602    const POOLED_BUF_SIZE: u64 = 2 * 1024 * 1024;
603    size.div_ceil(POOLED_BUF_SIZE) * POOLED_BUF_SIZE
604}
605
606impl RowGroups for InMemoryRowGroup<'_> {
607    fn num_rows(&self) -> usize {
608        self.base.row_count
609    }
610
611    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
612        // Creates a page reader to read column at `i`.
613        let page_reader = self.base.column_reader(i)?;
614
615        Ok(Box::new(ColumnChunkIterator {
616            reader: Some(Ok(Box::new(page_reader))),
617        }))
618    }
619
620    fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
621        Box::new(std::iter::once(self.base.row_group_metadata()))
622    }
623
624    fn metadata(&self) -> &ParquetMetaData {
625        self.base.parquet_metadata()
626    }
627}
628
629/// An in-memory column chunk
630#[derive(Clone)]
631pub(crate) enum ColumnChunkData {
632    /// Column chunk data representing only a subset of data pages
633    Sparse {
634        /// Length of the full column chunk
635        length: usize,
636        /// Set of data pages included in this sparse chunk. Each element is a tuple
637        /// of (page offset, page data)
638        data: Vec<(usize, Bytes)>,
639    },
640    /// Full column chunk and its offset
641    Dense { offset: usize, data: Bytes },
642}
643
644impl ColumnChunkData {
645    fn get(&self, start: u64) -> Result<Bytes> {
646        match &self {
647            ColumnChunkData::Sparse { data, .. } => data
648                .binary_search_by_key(&start, |(offset, _)| *offset as u64)
649                .map(|idx| data[idx].1.clone())
650                .map_err(|_| {
651                    ParquetError::General(format!(
652                        "Invalid offset in sparse column chunk data: {start}"
653                    ))
654                }),
655            ColumnChunkData::Dense { offset, data } => {
656                let start = start as usize - *offset;
657                Ok(data.slice(start..))
658            }
659        }
660    }
661}
662
663impl Length for ColumnChunkData {
664    fn len(&self) -> u64 {
665        match &self {
666            ColumnChunkData::Sparse { length, .. } => *length as u64,
667            ColumnChunkData::Dense { data, .. } => data.len() as u64,
668        }
669    }
670}
671
672impl ChunkReader for ColumnChunkData {
673    type T = bytes::buf::Reader<Bytes>;
674
675    fn get_read(&self, start: u64) -> Result<Self::T> {
676        Ok(self.get(start)?.reader())
677    }
678
679    fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
680        Ok(self.get(start)?.slice(..length))
681    }
682}
683
684/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
685pub(crate) struct ColumnChunkIterator {
686    pub(crate) reader: Option<Result<Box<dyn PageReader>>>,
687}
688
689impl Iterator for ColumnChunkIterator {
690    type Item = Result<Box<dyn PageReader>>;
691
692    fn next(&mut self) -> Option<Self::Item> {
693        self.reader.take()
694    }
695}
696
697impl PageIterator for ColumnChunkIterator {}