mito2/sst/parquet/
row_group.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Ports private structs from [parquet crate](https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/arrow/async_reader/mod.rs#L644-L650).
16
17use std::ops::Range;
18use std::sync::Arc;
19
20use bytes::{Buf, Bytes};
21use object_store::ObjectStore;
22use parquet::arrow::arrow_reader::{RowGroups, RowSelection};
23use parquet::arrow::ProjectionMask;
24use parquet::column::page::{PageIterator, PageReader};
25use parquet::errors::{ParquetError, Result};
26use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
27use parquet::file::page_index::offset_index::OffsetIndexMetaData;
28use parquet::file::reader::{ChunkReader, Length};
29use parquet::file::serialized_reader::SerializedPageReader;
30use store_api::storage::RegionId;
31use tokio::task::yield_now;
32
33use crate::cache::file_cache::{FileType, IndexKey};
34use crate::cache::{CacheStrategy, PageKey, PageValue};
35use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
36use crate::sst::file::FileId;
37use crate::sst::parquet::helper::{fetch_byte_ranges, MERGE_GAP};
38
39pub(crate) struct RowGroupBase<'a> {
40    metadata: &'a RowGroupMetaData,
41    pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
42    /// Compressed page of each column.
43    column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
44    pub(crate) row_count: usize,
45}
46
47impl<'a> RowGroupBase<'a> {
48    pub(crate) fn new(parquet_meta: &'a ParquetMetaData, row_group_idx: usize) -> Self {
49        let metadata = parquet_meta.row_group(row_group_idx);
50        // `offset_index` is always `None` if we don't set
51        // [with_page_index()](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index)
52        // to `true`.
53        let offset_index = parquet_meta
54            .offset_index()
55            // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
56            .filter(|index| !index.is_empty())
57            .map(|x| x[row_group_idx].as_slice());
58
59        Self {
60            metadata,
61            offset_index,
62            column_chunks: vec![None; metadata.columns().len()],
63            row_count: metadata.num_rows() as usize,
64        }
65    }
66
67    pub(crate) fn calc_sparse_read_ranges(
68        &self,
69        projection: &ProjectionMask,
70        offset_index: &[OffsetIndexMetaData],
71        selection: &RowSelection,
72    ) -> (Vec<Range<u64>>, Vec<Vec<usize>>) {
73        // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
74        // `RowSelection`
75        let mut page_start_offsets: Vec<Vec<usize>> = vec![];
76        let ranges = self
77            .column_chunks
78            .iter()
79            .zip(self.metadata.columns())
80            .enumerate()
81            .filter(|&(idx, (chunk, _chunk_meta))| chunk.is_none() && projection.leaf_included(idx))
82            .flat_map(|(idx, (_chunk, chunk_meta))| {
83                // If the first page does not start at the beginning of the column,
84                // then we need to also fetch a dictionary page.
85                let mut ranges = vec![];
86                let (start, _len) = chunk_meta.byte_range();
87                match offset_index[idx].page_locations.first() {
88                    Some(first) if first.offset as u64 != start => {
89                        ranges.push(start..first.offset as u64);
90                    }
91                    _ => (),
92                }
93
94                ranges.extend(
95                    selection
96                        .scan_ranges(&offset_index[idx].page_locations)
97                        .iter()
98                        .map(|range| range.start..range.end),
99                );
100                page_start_offsets.push(ranges.iter().map(|range| range.start as usize).collect());
101
102                ranges
103            })
104            .collect::<Vec<_>>();
105        (ranges, page_start_offsets)
106    }
107
108    pub(crate) fn assign_sparse_chunk(
109        &mut self,
110        projection: &ProjectionMask,
111        data: Vec<Bytes>,
112        page_start_offsets: Vec<Vec<usize>>,
113    ) {
114        let mut page_start_offsets = page_start_offsets.into_iter();
115        let mut chunk_data = data.into_iter();
116
117        for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
118            if chunk.is_some() || !projection.leaf_included(idx) {
119                continue;
120            }
121
122            if let Some(offsets) = page_start_offsets.next() {
123                let mut chunks = Vec::with_capacity(offsets.len());
124                for _ in 0..offsets.len() {
125                    chunks.push(chunk_data.next().unwrap());
126                }
127
128                *chunk = Some(Arc::new(ColumnChunkData::Sparse {
129                    length: self.metadata.column(idx).byte_range().1 as usize,
130                    data: offsets.into_iter().zip(chunks).collect(),
131                }))
132            }
133        }
134    }
135
136    pub(crate) fn calc_dense_read_ranges(&self, projection: &ProjectionMask) -> Vec<Range<u64>> {
137        self.column_chunks
138            .iter()
139            .enumerate()
140            .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
141            .map(|(idx, _chunk)| {
142                let column = self.metadata.column(idx);
143                let (start, length) = column.byte_range();
144                start..(start + length)
145            })
146            .collect::<Vec<_>>()
147    }
148
149    /// Assigns compressed chunk binary data to [RowGroupBase::column_chunks]
150    /// and returns the chunk offset and binary data assigned.
151    pub(crate) fn assign_dense_chunk(
152        &mut self,
153        projection: &ProjectionMask,
154        chunk_data: Vec<Bytes>,
155    ) {
156        let mut chunk_data = chunk_data.into_iter();
157
158        for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
159            if chunk.is_some() || !projection.leaf_included(idx) {
160                continue;
161            }
162
163            // Get the fetched page.
164            let Some(data) = chunk_data.next() else {
165                continue;
166            };
167
168            let column = self.metadata.column(idx);
169            *chunk = Some(Arc::new(ColumnChunkData::Dense {
170                offset: column.byte_range().0 as usize,
171                data,
172            }));
173        }
174    }
175
176    /// Create [PageReader] from [RowGroupBase::column_chunks]
177    pub(crate) fn column_reader(
178        &self,
179        col_idx: usize,
180    ) -> Result<SerializedPageReader<ColumnChunkData>> {
181        let page_reader = match &self.column_chunks[col_idx] {
182            None => {
183                return Err(ParquetError::General(format!(
184                    "Invalid column index {col_idx}, column was not fetched"
185                )))
186            }
187            Some(data) => {
188                let page_locations = self
189                    .offset_index
190                    // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
191                    .filter(|index| !index.is_empty())
192                    .map(|index| index[col_idx].page_locations.clone());
193                SerializedPageReader::new(
194                    data.clone(),
195                    self.metadata.column(col_idx),
196                    self.row_count,
197                    page_locations,
198                )?
199            }
200        };
201
202        Ok(page_reader)
203    }
204}
205
206/// An in-memory collection of column chunks
207pub struct InMemoryRowGroup<'a> {
208    region_id: RegionId,
209    file_id: FileId,
210    row_group_idx: usize,
211    cache_strategy: CacheStrategy,
212    file_path: &'a str,
213    /// Object store.
214    object_store: ObjectStore,
215    base: RowGroupBase<'a>,
216}
217
218impl<'a> InMemoryRowGroup<'a> {
219    /// Creates a new [InMemoryRowGroup] by `row_group_idx`.
220    ///
221    /// # Panics
222    /// Panics if the `row_group_idx` is invalid.
223    pub fn create(
224        region_id: RegionId,
225        file_id: FileId,
226        parquet_meta: &'a ParquetMetaData,
227        row_group_idx: usize,
228        cache_strategy: CacheStrategy,
229        file_path: &'a str,
230        object_store: ObjectStore,
231    ) -> Self {
232        Self {
233            region_id,
234            file_id,
235            row_group_idx,
236            cache_strategy,
237            file_path,
238            object_store,
239            base: RowGroupBase::new(parquet_meta, row_group_idx),
240        }
241    }
242
243    /// Fetches the necessary column data into memory
244    pub async fn fetch(
245        &mut self,
246        projection: &ProjectionMask,
247        selection: Option<&RowSelection>,
248    ) -> Result<()> {
249        if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
250            let (fetch_ranges, page_start_offsets) =
251                self.base
252                    .calc_sparse_read_ranges(projection, offset_index, selection);
253
254            let chunk_data = self.fetch_bytes(&fetch_ranges).await?;
255            // Assign sparse chunk data to base.
256            self.base
257                .assign_sparse_chunk(projection, chunk_data, page_start_offsets);
258        } else {
259            // Release the CPU to avoid blocking the runtime. Since `fetch_pages_from_cache`
260            // is a synchronous, CPU-bound operation.
261            yield_now().await;
262
263            // Calculate ranges to read.
264            let fetch_ranges = self.base.calc_dense_read_ranges(projection);
265
266            if fetch_ranges.is_empty() {
267                // Nothing to fetch.
268                return Ok(());
269            }
270
271            // Fetch data with ranges
272            let chunk_data = self.fetch_bytes(&fetch_ranges).await?;
273
274            // Assigns fetched data to base.
275            self.base.assign_dense_chunk(projection, chunk_data);
276        }
277
278        Ok(())
279    }
280
281    /// Try to fetch data from the memory cache or the WriteCache,
282    /// if not in WriteCache, fetch data from object store directly.
283    async fn fetch_bytes(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
284        // Now fetch page timer includes the whole time to read pages.
285        let _timer = READ_STAGE_FETCH_PAGES.start_timer();
286        let page_key = PageKey::new(self.file_id, self.row_group_idx, ranges.to_vec());
287        if let Some(pages) = self.cache_strategy.get_pages(&page_key) {
288            return Ok(pages.compressed.clone());
289        }
290
291        let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet);
292        let pages = match self.fetch_ranges_from_write_cache(key, ranges).await {
293            Some(data) => data,
294            None => {
295                // Fetch data from object store.
296                let _timer = READ_STAGE_ELAPSED
297                    .with_label_values(&["cache_miss_read"])
298                    .start_timer();
299                let data = fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges)
300                    .await
301                    .map_err(|e| ParquetError::External(Box::new(e)))?;
302                data
303            }
304        };
305
306        // Put pages back to the cache.
307        let total_range_size = compute_total_range_size(ranges);
308        let page_value = PageValue::new(pages.clone(), total_range_size);
309        self.cache_strategy
310            .put_pages(page_key, Arc::new(page_value));
311
312        Ok(pages)
313    }
314
315    /// Fetches data from write cache.
316    /// Returns `None` if the data is not in the cache.
317    async fn fetch_ranges_from_write_cache(
318        &self,
319        key: IndexKey,
320        ranges: &[Range<u64>],
321    ) -> Option<Vec<Bytes>> {
322        if let Some(cache) = self.cache_strategy.write_cache() {
323            return cache.file_cache().read_ranges(key, ranges).await;
324        }
325        None
326    }
327}
328
329/// Computes the max possible buffer size to read the given `ranges`.
330// See https://github.com/apache/opendal/blob/v0.54.0/core/src/types/read/reader.rs#L166-L192
331fn compute_total_range_size(ranges: &[Range<u64>]) -> u64 {
332    if ranges.is_empty() {
333        return 0;
334    }
335
336    let gap = MERGE_GAP as u64;
337    let mut sorted_ranges = ranges.to_vec();
338    sorted_ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start));
339
340    let mut total_size = 0;
341    let mut cur = sorted_ranges[0].clone();
342
343    for range in sorted_ranges.into_iter().skip(1) {
344        if range.start <= cur.end + gap {
345            // There is an overlap or the gap is small enough to merge
346            cur.end = cur.end.max(range.end);
347        } else {
348            // No overlap and the gap is too large, add current range to total and start a new one
349            total_size += align_to_pooled_buf_size(cur.end - cur.start);
350            cur = range;
351        }
352    }
353
354    // Add the last range
355    total_size += align_to_pooled_buf_size(cur.end - cur.start);
356
357    total_size
358}
359
360/// Aligns the given size to the multiple of the pooled buffer size.
361// See:
362// - https://github.com/apache/opendal/blob/v0.54.0/core/src/services/fs/backend.rs#L178
363// - https://github.com/apache/opendal/blob/v0.54.0/core/src/services/fs/reader.rs#L36-L46
364fn align_to_pooled_buf_size(size: u64) -> u64 {
365    const POOLED_BUF_SIZE: u64 = 2 * 1024 * 1024;
366    size.div_ceil(POOLED_BUF_SIZE) * POOLED_BUF_SIZE
367}
368
369impl RowGroups for InMemoryRowGroup<'_> {
370    fn num_rows(&self) -> usize {
371        self.base.row_count
372    }
373
374    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
375        // Creates a page reader to read column at `i`.
376        let page_reader = self.base.column_reader(i)?;
377
378        Ok(Box::new(ColumnChunkIterator {
379            reader: Some(Ok(Box::new(page_reader))),
380        }))
381    }
382}
383
384/// An in-memory column chunk
385#[derive(Clone)]
386pub(crate) enum ColumnChunkData {
387    /// Column chunk data representing only a subset of data pages
388    Sparse {
389        /// Length of the full column chunk
390        length: usize,
391        /// Set of data pages included in this sparse chunk. Each element is a tuple
392        /// of (page offset, page data)
393        data: Vec<(usize, Bytes)>,
394    },
395    /// Full column chunk and its offset
396    Dense { offset: usize, data: Bytes },
397}
398
399impl ColumnChunkData {
400    fn get(&self, start: u64) -> Result<Bytes> {
401        match &self {
402            ColumnChunkData::Sparse { data, .. } => data
403                .binary_search_by_key(&start, |(offset, _)| *offset as u64)
404                .map(|idx| data[idx].1.clone())
405                .map_err(|_| {
406                    ParquetError::General(format!(
407                        "Invalid offset in sparse column chunk data: {start}"
408                    ))
409                }),
410            ColumnChunkData::Dense { offset, data } => {
411                let start = start as usize - *offset;
412                Ok(data.slice(start..))
413            }
414        }
415    }
416}
417
418impl Length for ColumnChunkData {
419    fn len(&self) -> u64 {
420        match &self {
421            ColumnChunkData::Sparse { length, .. } => *length as u64,
422            ColumnChunkData::Dense { data, .. } => data.len() as u64,
423        }
424    }
425}
426
427impl ChunkReader for ColumnChunkData {
428    type T = bytes::buf::Reader<Bytes>;
429
430    fn get_read(&self, start: u64) -> Result<Self::T> {
431        Ok(self.get(start)?.reader())
432    }
433
434    fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
435        Ok(self.get(start)?.slice(..length))
436    }
437}
438
439/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
440pub(crate) struct ColumnChunkIterator {
441    pub(crate) reader: Option<Result<Box<dyn PageReader>>>,
442}
443
444impl Iterator for ColumnChunkIterator {
445    type Item = Result<Box<dyn PageReader>>;
446
447    fn next(&mut self) -> Option<Self::Item> {
448        self.reader.take()
449    }
450}
451
452impl PageIterator for ColumnChunkIterator {}