mito2/sst/parquet/
row_group.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Ports private structs from [parquet crate](https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/arrow/async_reader/mod.rs#L644-L650).
16
17use std::ops::Range;
18use std::sync::Arc;
19
20use bytes::{Buf, Bytes};
21use object_store::ObjectStore;
22use parquet::arrow::ProjectionMask;
23use parquet::arrow::arrow_reader::{RowGroups, RowSelection};
24use parquet::column::page::{PageIterator, PageReader};
25use parquet::errors::{ParquetError, Result};
26use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
27use parquet::file::page_index::offset_index::OffsetIndexMetaData;
28use parquet::file::reader::{ChunkReader, Length};
29use parquet::file::serialized_reader::SerializedPageReader;
30use store_api::storage::{FileId, RegionId};
31use tokio::task::yield_now;
32
33use crate::cache::file_cache::{FileType, IndexKey};
34use crate::cache::{CacheStrategy, PageKey, PageValue};
35use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
36use crate::sst::parquet::helper::{MERGE_GAP, fetch_byte_ranges};
37
38/// Inner data for ParquetFetchMetrics.
39#[derive(Default, Debug, Clone)]
40pub struct ParquetFetchMetricsData {
41    /// Number of page cache hits.
42    pub page_cache_hit: usize,
43    /// Number of write cache hits.
44    pub write_cache_hit: usize,
45    /// Number of cache misses.
46    pub cache_miss: usize,
47    /// Number of pages to fetch from mem cache.
48    pub pages_to_fetch_mem: usize,
49    /// Total size in bytes of pages to fetch from mem cache.
50    pub page_size_to_fetch_mem: u64,
51    /// Number of pages to fetch from write cache.
52    pub pages_to_fetch_write_cache: usize,
53    /// Total size in bytes of pages to fetch from write cache.
54    pub page_size_to_fetch_write_cache: u64,
55    /// Number of pages to fetch from store.
56    pub pages_to_fetch_store: usize,
57    /// Total size in bytes of pages to fetch from store.
58    pub page_size_to_fetch_store: u64,
59    /// Total size in bytes of pages actually returned.
60    pub page_size_needed: u64,
61    /// Elapsed time fetching from write cache.
62    pub write_cache_fetch_elapsed: std::time::Duration,
63    /// Elapsed time fetching from object store.
64    pub store_fetch_elapsed: std::time::Duration,
65    /// Total elapsed time for fetching row groups.
66    pub total_fetch_elapsed: std::time::Duration,
67}
68
69impl ParquetFetchMetricsData {
70    /// Returns true if the metrics are empty (contain no meaningful data).
71    fn is_empty(&self) -> bool {
72        self.total_fetch_elapsed.is_zero()
73    }
74}
75
76/// Metrics for tracking page/row group fetch operations.
77#[derive(Default)]
78pub struct ParquetFetchMetrics {
79    pub data: std::sync::Mutex<ParquetFetchMetricsData>,
80}
81
82impl std::fmt::Debug for ParquetFetchMetrics {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        let data = self.data.lock().unwrap();
85        if data.is_empty() {
86            return write!(f, "{{}}");
87        }
88
89        let ParquetFetchMetricsData {
90            page_cache_hit,
91            write_cache_hit,
92            cache_miss,
93            pages_to_fetch_mem,
94            page_size_to_fetch_mem,
95            pages_to_fetch_write_cache,
96            page_size_to_fetch_write_cache,
97            pages_to_fetch_store,
98            page_size_to_fetch_store,
99            page_size_needed,
100            write_cache_fetch_elapsed,
101            store_fetch_elapsed,
102            total_fetch_elapsed,
103        } = *data;
104
105        write!(f, "{{")?;
106
107        write!(f, "\"total_fetch_elapsed\":\"{:?}\"", total_fetch_elapsed)?;
108
109        if page_cache_hit > 0 {
110            write!(f, ", \"page_cache_hit\":{}", page_cache_hit)?;
111        }
112        if write_cache_hit > 0 {
113            write!(f, ", \"write_cache_hit\":{}", write_cache_hit)?;
114        }
115        if cache_miss > 0 {
116            write!(f, ", \"cache_miss\":{}", cache_miss)?;
117        }
118        if pages_to_fetch_mem > 0 {
119            write!(f, ", \"pages_to_fetch_mem\":{}", pages_to_fetch_mem)?;
120        }
121        if page_size_to_fetch_mem > 0 {
122            write!(f, ", \"page_size_to_fetch_mem\":{}", page_size_to_fetch_mem)?;
123        }
124        if pages_to_fetch_write_cache > 0 {
125            write!(
126                f,
127                ", \"pages_to_fetch_write_cache\":{}",
128                pages_to_fetch_write_cache
129            )?;
130        }
131        if page_size_to_fetch_write_cache > 0 {
132            write!(
133                f,
134                ", \"page_size_to_fetch_write_cache\":{}",
135                page_size_to_fetch_write_cache
136            )?;
137        }
138        if pages_to_fetch_store > 0 {
139            write!(f, ", \"pages_to_fetch_store\":{}", pages_to_fetch_store)?;
140        }
141        if page_size_to_fetch_store > 0 {
142            write!(
143                f,
144                ", \"page_size_to_fetch_store\":{}",
145                page_size_to_fetch_store
146            )?;
147        }
148        if page_size_needed > 0 {
149            write!(f, ", \"page_size_needed\":{}", page_size_needed)?;
150        }
151        if !write_cache_fetch_elapsed.is_zero() {
152            write!(
153                f,
154                ", \"write_cache_fetch_elapsed\":\"{:?}\"",
155                write_cache_fetch_elapsed
156            )?;
157        }
158        if !store_fetch_elapsed.is_zero() {
159            write!(f, ", \"store_fetch_elapsed\":\"{:?}\"", store_fetch_elapsed)?;
160        }
161
162        write!(f, "}}")
163    }
164}
165
166impl ParquetFetchMetrics {
167    /// Returns true if the metrics are empty (contain no meaningful data).
168    pub fn is_empty(&self) -> bool {
169        self.data.lock().unwrap().is_empty()
170    }
171
172    /// Merges metrics from another [ParquetFetchMetrics].
173    pub fn merge_from(&self, other: &ParquetFetchMetrics) {
174        let ParquetFetchMetricsData {
175            page_cache_hit,
176            write_cache_hit,
177            cache_miss,
178            pages_to_fetch_mem,
179            page_size_to_fetch_mem,
180            pages_to_fetch_write_cache,
181            page_size_to_fetch_write_cache,
182            pages_to_fetch_store,
183            page_size_to_fetch_store,
184            page_size_needed,
185            write_cache_fetch_elapsed,
186            store_fetch_elapsed,
187            total_fetch_elapsed,
188        } = *other.data.lock().unwrap();
189
190        let mut data = self.data.lock().unwrap();
191        data.page_cache_hit += page_cache_hit;
192        data.write_cache_hit += write_cache_hit;
193        data.cache_miss += cache_miss;
194        data.pages_to_fetch_mem += pages_to_fetch_mem;
195        data.page_size_to_fetch_mem += page_size_to_fetch_mem;
196        data.pages_to_fetch_write_cache += pages_to_fetch_write_cache;
197        data.page_size_to_fetch_write_cache += page_size_to_fetch_write_cache;
198        data.pages_to_fetch_store += pages_to_fetch_store;
199        data.page_size_to_fetch_store += page_size_to_fetch_store;
200        data.page_size_needed += page_size_needed;
201        data.write_cache_fetch_elapsed += write_cache_fetch_elapsed;
202        data.store_fetch_elapsed += store_fetch_elapsed;
203        data.total_fetch_elapsed += total_fetch_elapsed;
204    }
205}
206
207pub(crate) struct RowGroupBase<'a> {
208    metadata: &'a RowGroupMetaData,
209    pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
210    /// Compressed page of each column.
211    column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
212    pub(crate) row_count: usize,
213}
214
215impl<'a> RowGroupBase<'a> {
216    pub(crate) fn new(parquet_meta: &'a ParquetMetaData, row_group_idx: usize) -> Self {
217        let metadata = parquet_meta.row_group(row_group_idx);
218        // `offset_index` is always `None` if we don't set
219        // [with_page_index()](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index)
220        // to `true`.
221        let offset_index = parquet_meta
222            .offset_index()
223            // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
224            .filter(|index| !index.is_empty())
225            .map(|x| x[row_group_idx].as_slice());
226
227        Self {
228            metadata,
229            offset_index,
230            column_chunks: vec![None; metadata.columns().len()],
231            row_count: metadata.num_rows() as usize,
232        }
233    }
234
235    pub(crate) fn calc_sparse_read_ranges(
236        &self,
237        projection: &ProjectionMask,
238        offset_index: &[OffsetIndexMetaData],
239        selection: &RowSelection,
240    ) -> (Vec<Range<u64>>, Vec<Vec<usize>>) {
241        // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
242        // `RowSelection`
243        let mut page_start_offsets: Vec<Vec<usize>> = vec![];
244        let ranges = self
245            .column_chunks
246            .iter()
247            .zip(self.metadata.columns())
248            .enumerate()
249            .filter(|&(idx, (chunk, _chunk_meta))| chunk.is_none() && projection.leaf_included(idx))
250            .flat_map(|(idx, (_chunk, chunk_meta))| {
251                // If the first page does not start at the beginning of the column,
252                // then we need to also fetch a dictionary page.
253                let mut ranges = vec![];
254                let (start, _len) = chunk_meta.byte_range();
255                match offset_index[idx].page_locations.first() {
256                    Some(first) if first.offset as u64 != start => {
257                        ranges.push(start..first.offset as u64);
258                    }
259                    _ => (),
260                }
261
262                ranges.extend(
263                    selection
264                        .scan_ranges(&offset_index[idx].page_locations)
265                        .iter()
266                        .map(|range| range.start..range.end),
267                );
268                page_start_offsets.push(ranges.iter().map(|range| range.start as usize).collect());
269
270                ranges
271            })
272            .collect::<Vec<_>>();
273        (ranges, page_start_offsets)
274    }
275
276    pub(crate) fn assign_sparse_chunk(
277        &mut self,
278        projection: &ProjectionMask,
279        data: Vec<Bytes>,
280        page_start_offsets: Vec<Vec<usize>>,
281    ) {
282        let mut page_start_offsets = page_start_offsets.into_iter();
283        let mut chunk_data = data.into_iter();
284
285        for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
286            if chunk.is_some() || !projection.leaf_included(idx) {
287                continue;
288            }
289
290            if let Some(offsets) = page_start_offsets.next() {
291                let mut chunks = Vec::with_capacity(offsets.len());
292                for _ in 0..offsets.len() {
293                    chunks.push(chunk_data.next().unwrap());
294                }
295
296                *chunk = Some(Arc::new(ColumnChunkData::Sparse {
297                    length: self.metadata.column(idx).byte_range().1 as usize,
298                    data: offsets.into_iter().zip(chunks).collect(),
299                }))
300            }
301        }
302    }
303
304    pub(crate) fn calc_dense_read_ranges(&self, projection: &ProjectionMask) -> Vec<Range<u64>> {
305        self.column_chunks
306            .iter()
307            .enumerate()
308            .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
309            .map(|(idx, _chunk)| {
310                let column = self.metadata.column(idx);
311                let (start, length) = column.byte_range();
312                start..(start + length)
313            })
314            .collect::<Vec<_>>()
315    }
316
317    /// Assigns compressed chunk binary data to [RowGroupBase::column_chunks]
318    /// and returns the chunk offset and binary data assigned.
319    pub(crate) fn assign_dense_chunk(
320        &mut self,
321        projection: &ProjectionMask,
322        chunk_data: Vec<Bytes>,
323    ) {
324        let mut chunk_data = chunk_data.into_iter();
325
326        for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
327            if chunk.is_some() || !projection.leaf_included(idx) {
328                continue;
329            }
330
331            // Get the fetched page.
332            let Some(data) = chunk_data.next() else {
333                continue;
334            };
335
336            let column = self.metadata.column(idx);
337            *chunk = Some(Arc::new(ColumnChunkData::Dense {
338                offset: column.byte_range().0 as usize,
339                data,
340            }));
341        }
342    }
343
344    /// Create [PageReader] from [RowGroupBase::column_chunks]
345    pub(crate) fn column_reader(
346        &self,
347        col_idx: usize,
348    ) -> Result<SerializedPageReader<ColumnChunkData>> {
349        let page_reader = match &self.column_chunks[col_idx] {
350            None => {
351                return Err(ParquetError::General(format!(
352                    "Invalid column index {col_idx}, column was not fetched"
353                )));
354            }
355            Some(data) => {
356                let page_locations = self
357                    .offset_index
358                    // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
359                    .filter(|index| !index.is_empty())
360                    .map(|index| index[col_idx].page_locations.clone());
361                SerializedPageReader::new(
362                    data.clone(),
363                    self.metadata.column(col_idx),
364                    self.row_count,
365                    page_locations,
366                )?
367            }
368        };
369
370        Ok(page_reader)
371    }
372}
373
374/// An in-memory collection of column chunks
375pub struct InMemoryRowGroup<'a> {
376    region_id: RegionId,
377    file_id: FileId,
378    row_group_idx: usize,
379    cache_strategy: CacheStrategy,
380    file_path: &'a str,
381    /// Object store.
382    object_store: ObjectStore,
383    base: RowGroupBase<'a>,
384}
385
386impl<'a> InMemoryRowGroup<'a> {
387    /// Creates a new [InMemoryRowGroup] by `row_group_idx`.
388    ///
389    /// # Panics
390    /// Panics if the `row_group_idx` is invalid.
391    pub fn create(
392        region_id: RegionId,
393        file_id: FileId,
394        parquet_meta: &'a ParquetMetaData,
395        row_group_idx: usize,
396        cache_strategy: CacheStrategy,
397        file_path: &'a str,
398        object_store: ObjectStore,
399    ) -> Self {
400        Self {
401            region_id,
402            file_id,
403            row_group_idx,
404            cache_strategy,
405            file_path,
406            object_store,
407            base: RowGroupBase::new(parquet_meta, row_group_idx),
408        }
409    }
410
411    /// Fetches the necessary column data into memory
412    pub async fn fetch(
413        &mut self,
414        projection: &ProjectionMask,
415        selection: Option<&RowSelection>,
416        metrics: Option<&ParquetFetchMetrics>,
417    ) -> Result<()> {
418        if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
419            let (fetch_ranges, page_start_offsets) =
420                self.base
421                    .calc_sparse_read_ranges(projection, offset_index, selection);
422
423            let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?;
424            // Assign sparse chunk data to base.
425            self.base
426                .assign_sparse_chunk(projection, chunk_data, page_start_offsets);
427        } else {
428            // Release the CPU to avoid blocking the runtime. Since `fetch_pages_from_cache`
429            // is a synchronous, CPU-bound operation.
430            yield_now().await;
431
432            // Calculate ranges to read.
433            let fetch_ranges = self.base.calc_dense_read_ranges(projection);
434
435            if fetch_ranges.is_empty() {
436                // Nothing to fetch.
437                return Ok(());
438            }
439
440            // Fetch data with ranges
441            let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?;
442
443            // Assigns fetched data to base.
444            self.base.assign_dense_chunk(projection, chunk_data);
445        }
446
447        Ok(())
448    }
449
450    /// Try to fetch data from the memory cache or the WriteCache,
451    /// if not in WriteCache, fetch data from object store directly.
452    async fn fetch_bytes(
453        &self,
454        ranges: &[Range<u64>],
455        metrics: Option<&ParquetFetchMetrics>,
456    ) -> Result<Vec<Bytes>> {
457        // Now fetch page timer includes the whole time to read pages.
458        let _timer = READ_STAGE_FETCH_PAGES.start_timer();
459
460        let page_key = PageKey::new(self.file_id, self.row_group_idx, ranges.to_vec());
461        if let Some(pages) = self.cache_strategy.get_pages(&page_key) {
462            if let Some(metrics) = metrics {
463                let total_size: u64 = ranges.iter().map(|r| r.end - r.start).sum();
464                let mut metrics_data = metrics.data.lock().unwrap();
465                metrics_data.page_cache_hit += 1;
466                metrics_data.pages_to_fetch_mem += ranges.len();
467                metrics_data.page_size_to_fetch_mem += total_size;
468                metrics_data.page_size_needed += total_size;
469            }
470            return Ok(pages.compressed.clone());
471        }
472
473        // Calculate total range size for metrics.
474        let (total_range_size, unaligned_size) = compute_total_range_size(ranges);
475
476        let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet);
477        let fetch_write_cache_start = metrics.map(|_| std::time::Instant::now());
478        let write_cache_result = self.fetch_ranges_from_write_cache(key, ranges).await;
479        let pages = match write_cache_result {
480            Some(data) => {
481                if let Some(metrics) = metrics {
482                    let elapsed = fetch_write_cache_start
483                        .map(|start| start.elapsed())
484                        .unwrap_or_default();
485                    let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
486                    let mut metrics_data = metrics.data.lock().unwrap();
487                    metrics_data.write_cache_fetch_elapsed += elapsed;
488                    metrics_data.write_cache_hit += 1;
489                    metrics_data.pages_to_fetch_write_cache += ranges.len();
490                    metrics_data.page_size_to_fetch_write_cache += unaligned_size;
491                    metrics_data.page_size_needed += range_size_needed;
492                }
493                data
494            }
495            None => {
496                // Fetch data from object store.
497                let _timer = READ_STAGE_ELAPSED
498                    .with_label_values(&["cache_miss_read"])
499                    .start_timer();
500
501                let start = metrics.map(|_| std::time::Instant::now());
502                let data = fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges)
503                    .await
504                    .map_err(|e| ParquetError::External(Box::new(e)))?;
505                if let Some(metrics) = metrics {
506                    let elapsed = start.map(|start| start.elapsed()).unwrap_or_default();
507                    let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
508                    let mut metrics_data = metrics.data.lock().unwrap();
509                    metrics_data.store_fetch_elapsed += elapsed;
510                    metrics_data.cache_miss += 1;
511                    metrics_data.pages_to_fetch_store += ranges.len();
512                    metrics_data.page_size_to_fetch_store += unaligned_size;
513                    metrics_data.page_size_needed += range_size_needed;
514                }
515                data
516            }
517        };
518
519        // Put pages back to the cache.
520        let page_value = PageValue::new(pages.clone(), total_range_size);
521        self.cache_strategy
522            .put_pages(page_key, Arc::new(page_value));
523
524        Ok(pages)
525    }
526
527    /// Fetches data from write cache.
528    /// Returns `None` if the data is not in the cache.
529    async fn fetch_ranges_from_write_cache(
530        &self,
531        key: IndexKey,
532        ranges: &[Range<u64>],
533    ) -> Option<Vec<Bytes>> {
534        if let Some(cache) = self.cache_strategy.write_cache() {
535            return cache.file_cache().read_ranges(key, ranges).await;
536        }
537        None
538    }
539}
540
541/// Computes the max possible buffer size to read the given `ranges`.
542/// Returns (aligned_size, unaligned_size) where:
543/// - aligned_size: total size aligned to pooled buffer size
544/// - unaligned_size: actual total size without alignment
545// See https://github.com/apache/opendal/blob/v0.54.0/core/src/types/read/reader.rs#L166-L192
546fn compute_total_range_size(ranges: &[Range<u64>]) -> (u64, u64) {
547    if ranges.is_empty() {
548        return (0, 0);
549    }
550
551    let gap = MERGE_GAP as u64;
552    let mut sorted_ranges = ranges.to_vec();
553    sorted_ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start));
554
555    let mut total_size_aligned = 0;
556    let mut total_size_unaligned = 0;
557    let mut cur = sorted_ranges[0].clone();
558
559    for range in sorted_ranges.into_iter().skip(1) {
560        if range.start <= cur.end + gap {
561            // There is an overlap or the gap is small enough to merge
562            cur.end = cur.end.max(range.end);
563        } else {
564            // No overlap and the gap is too large, add current range to total and start a new one
565            let range_size = cur.end - cur.start;
566            total_size_aligned += align_to_pooled_buf_size(range_size);
567            total_size_unaligned += range_size;
568            cur = range;
569        }
570    }
571
572    // Add the last range
573    let range_size = cur.end - cur.start;
574    total_size_aligned += align_to_pooled_buf_size(range_size);
575    total_size_unaligned += range_size;
576
577    (total_size_aligned, total_size_unaligned)
578}
579
580/// Aligns the given size to the multiple of the pooled buffer size.
581// See:
582// - https://github.com/apache/opendal/blob/v0.54.0/core/src/services/fs/backend.rs#L178
583// - https://github.com/apache/opendal/blob/v0.54.0/core/src/services/fs/reader.rs#L36-L46
584fn align_to_pooled_buf_size(size: u64) -> u64 {
585    const POOLED_BUF_SIZE: u64 = 2 * 1024 * 1024;
586    size.div_ceil(POOLED_BUF_SIZE) * POOLED_BUF_SIZE
587}
588
589impl RowGroups for InMemoryRowGroup<'_> {
590    fn num_rows(&self) -> usize {
591        self.base.row_count
592    }
593
594    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
595        // Creates a page reader to read column at `i`.
596        let page_reader = self.base.column_reader(i)?;
597
598        Ok(Box::new(ColumnChunkIterator {
599            reader: Some(Ok(Box::new(page_reader))),
600        }))
601    }
602}
603
604/// An in-memory column chunk
605#[derive(Clone)]
606pub(crate) enum ColumnChunkData {
607    /// Column chunk data representing only a subset of data pages
608    Sparse {
609        /// Length of the full column chunk
610        length: usize,
611        /// Set of data pages included in this sparse chunk. Each element is a tuple
612        /// of (page offset, page data)
613        data: Vec<(usize, Bytes)>,
614    },
615    /// Full column chunk and its offset
616    Dense { offset: usize, data: Bytes },
617}
618
619impl ColumnChunkData {
620    fn get(&self, start: u64) -> Result<Bytes> {
621        match &self {
622            ColumnChunkData::Sparse { data, .. } => data
623                .binary_search_by_key(&start, |(offset, _)| *offset as u64)
624                .map(|idx| data[idx].1.clone())
625                .map_err(|_| {
626                    ParquetError::General(format!(
627                        "Invalid offset in sparse column chunk data: {start}"
628                    ))
629                }),
630            ColumnChunkData::Dense { offset, data } => {
631                let start = start as usize - *offset;
632                Ok(data.slice(start..))
633            }
634        }
635    }
636}
637
638impl Length for ColumnChunkData {
639    fn len(&self) -> u64 {
640        match &self {
641            ColumnChunkData::Sparse { length, .. } => *length as u64,
642            ColumnChunkData::Dense { data, .. } => data.len() as u64,
643        }
644    }
645}
646
647impl ChunkReader for ColumnChunkData {
648    type T = bytes::buf::Reader<Bytes>;
649
650    fn get_read(&self, start: u64) -> Result<Self::T> {
651        Ok(self.get(start)?.reader())
652    }
653
654    fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
655        Ok(self.get(start)?.slice(..length))
656    }
657}
658
659/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
660pub(crate) struct ColumnChunkIterator {
661    pub(crate) reader: Option<Result<Box<dyn PageReader>>>,
662}
663
664impl Iterator for ColumnChunkIterator {
665    type Item = Result<Box<dyn PageReader>>;
666
667    fn next(&mut self) -> Option<Self::Item> {
668        self.reader.take()
669    }
670}
671
672impl PageIterator for ColumnChunkIterator {}