mito2/sst/parquet/
row_group.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Ports private structs from [parquet crate](https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/arrow/async_reader/mod.rs#L644-L650).
16
17use std::ops::Range;
18use std::sync::Arc;
19
20use bytes::{Buf, Bytes};
21use object_store::ObjectStore;
22use parquet::arrow::ProjectionMask;
23use parquet::arrow::arrow_reader::{RowGroups, RowSelection};
24use parquet::column::page::{PageIterator, PageReader};
25use parquet::errors::{ParquetError, Result};
26use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
27use parquet::file::page_index::offset_index::OffsetIndexMetaData;
28use parquet::file::reader::{ChunkReader, Length};
29use parquet::file::serialized_reader::SerializedPageReader;
30use store_api::storage::{FileId, RegionId};
31use tokio::task::yield_now;
32
33use crate::cache::file_cache::{FileType, IndexKey};
34use crate::cache::{CacheStrategy, PageKey, PageValue};
35use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
36use crate::sst::parquet::helper::{MERGE_GAP, fetch_byte_ranges};
37
38pub(crate) struct RowGroupBase<'a> {
39    metadata: &'a RowGroupMetaData,
40    pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
41    /// Compressed page of each column.
42    column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
43    pub(crate) row_count: usize,
44}
45
46impl<'a> RowGroupBase<'a> {
47    pub(crate) fn new(parquet_meta: &'a ParquetMetaData, row_group_idx: usize) -> Self {
48        let metadata = parquet_meta.row_group(row_group_idx);
49        // `offset_index` is always `None` if we don't set
50        // [with_page_index()](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index)
51        // to `true`.
52        let offset_index = parquet_meta
53            .offset_index()
54            // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
55            .filter(|index| !index.is_empty())
56            .map(|x| x[row_group_idx].as_slice());
57
58        Self {
59            metadata,
60            offset_index,
61            column_chunks: vec![None; metadata.columns().len()],
62            row_count: metadata.num_rows() as usize,
63        }
64    }
65
66    pub(crate) fn calc_sparse_read_ranges(
67        &self,
68        projection: &ProjectionMask,
69        offset_index: &[OffsetIndexMetaData],
70        selection: &RowSelection,
71    ) -> (Vec<Range<u64>>, Vec<Vec<usize>>) {
72        // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
73        // `RowSelection`
74        let mut page_start_offsets: Vec<Vec<usize>> = vec![];
75        let ranges = self
76            .column_chunks
77            .iter()
78            .zip(self.metadata.columns())
79            .enumerate()
80            .filter(|&(idx, (chunk, _chunk_meta))| chunk.is_none() && projection.leaf_included(idx))
81            .flat_map(|(idx, (_chunk, chunk_meta))| {
82                // If the first page does not start at the beginning of the column,
83                // then we need to also fetch a dictionary page.
84                let mut ranges = vec![];
85                let (start, _len) = chunk_meta.byte_range();
86                match offset_index[idx].page_locations.first() {
87                    Some(first) if first.offset as u64 != start => {
88                        ranges.push(start..first.offset as u64);
89                    }
90                    _ => (),
91                }
92
93                ranges.extend(
94                    selection
95                        .scan_ranges(&offset_index[idx].page_locations)
96                        .iter()
97                        .map(|range| range.start..range.end),
98                );
99                page_start_offsets.push(ranges.iter().map(|range| range.start as usize).collect());
100
101                ranges
102            })
103            .collect::<Vec<_>>();
104        (ranges, page_start_offsets)
105    }
106
107    pub(crate) fn assign_sparse_chunk(
108        &mut self,
109        projection: &ProjectionMask,
110        data: Vec<Bytes>,
111        page_start_offsets: Vec<Vec<usize>>,
112    ) {
113        let mut page_start_offsets = page_start_offsets.into_iter();
114        let mut chunk_data = data.into_iter();
115
116        for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
117            if chunk.is_some() || !projection.leaf_included(idx) {
118                continue;
119            }
120
121            if let Some(offsets) = page_start_offsets.next() {
122                let mut chunks = Vec::with_capacity(offsets.len());
123                for _ in 0..offsets.len() {
124                    chunks.push(chunk_data.next().unwrap());
125                }
126
127                *chunk = Some(Arc::new(ColumnChunkData::Sparse {
128                    length: self.metadata.column(idx).byte_range().1 as usize,
129                    data: offsets.into_iter().zip(chunks).collect(),
130                }))
131            }
132        }
133    }
134
135    pub(crate) fn calc_dense_read_ranges(&self, projection: &ProjectionMask) -> Vec<Range<u64>> {
136        self.column_chunks
137            .iter()
138            .enumerate()
139            .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
140            .map(|(idx, _chunk)| {
141                let column = self.metadata.column(idx);
142                let (start, length) = column.byte_range();
143                start..(start + length)
144            })
145            .collect::<Vec<_>>()
146    }
147
148    /// Assigns compressed chunk binary data to [RowGroupBase::column_chunks]
149    /// and returns the chunk offset and binary data assigned.
150    pub(crate) fn assign_dense_chunk(
151        &mut self,
152        projection: &ProjectionMask,
153        chunk_data: Vec<Bytes>,
154    ) {
155        let mut chunk_data = chunk_data.into_iter();
156
157        for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
158            if chunk.is_some() || !projection.leaf_included(idx) {
159                continue;
160            }
161
162            // Get the fetched page.
163            let Some(data) = chunk_data.next() else {
164                continue;
165            };
166
167            let column = self.metadata.column(idx);
168            *chunk = Some(Arc::new(ColumnChunkData::Dense {
169                offset: column.byte_range().0 as usize,
170                data,
171            }));
172        }
173    }
174
175    /// Create [PageReader] from [RowGroupBase::column_chunks]
176    pub(crate) fn column_reader(
177        &self,
178        col_idx: usize,
179    ) -> Result<SerializedPageReader<ColumnChunkData>> {
180        let page_reader = match &self.column_chunks[col_idx] {
181            None => {
182                return Err(ParquetError::General(format!(
183                    "Invalid column index {col_idx}, column was not fetched"
184                )));
185            }
186            Some(data) => {
187                let page_locations = self
188                    .offset_index
189                    // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
190                    .filter(|index| !index.is_empty())
191                    .map(|index| index[col_idx].page_locations.clone());
192                SerializedPageReader::new(
193                    data.clone(),
194                    self.metadata.column(col_idx),
195                    self.row_count,
196                    page_locations,
197                )?
198            }
199        };
200
201        Ok(page_reader)
202    }
203}
204
205/// An in-memory collection of column chunks
206pub struct InMemoryRowGroup<'a> {
207    region_id: RegionId,
208    file_id: FileId,
209    row_group_idx: usize,
210    cache_strategy: CacheStrategy,
211    file_path: &'a str,
212    /// Object store.
213    object_store: ObjectStore,
214    base: RowGroupBase<'a>,
215}
216
217impl<'a> InMemoryRowGroup<'a> {
218    /// Creates a new [InMemoryRowGroup] by `row_group_idx`.
219    ///
220    /// # Panics
221    /// Panics if the `row_group_idx` is invalid.
222    pub fn create(
223        region_id: RegionId,
224        file_id: FileId,
225        parquet_meta: &'a ParquetMetaData,
226        row_group_idx: usize,
227        cache_strategy: CacheStrategy,
228        file_path: &'a str,
229        object_store: ObjectStore,
230    ) -> Self {
231        Self {
232            region_id,
233            file_id,
234            row_group_idx,
235            cache_strategy,
236            file_path,
237            object_store,
238            base: RowGroupBase::new(parquet_meta, row_group_idx),
239        }
240    }
241
242    /// Fetches the necessary column data into memory
243    pub async fn fetch(
244        &mut self,
245        projection: &ProjectionMask,
246        selection: Option<&RowSelection>,
247    ) -> Result<()> {
248        if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
249            let (fetch_ranges, page_start_offsets) =
250                self.base
251                    .calc_sparse_read_ranges(projection, offset_index, selection);
252
253            let chunk_data = self.fetch_bytes(&fetch_ranges).await?;
254            // Assign sparse chunk data to base.
255            self.base
256                .assign_sparse_chunk(projection, chunk_data, page_start_offsets);
257        } else {
258            // Release the CPU to avoid blocking the runtime. Since `fetch_pages_from_cache`
259            // is a synchronous, CPU-bound operation.
260            yield_now().await;
261
262            // Calculate ranges to read.
263            let fetch_ranges = self.base.calc_dense_read_ranges(projection);
264
265            if fetch_ranges.is_empty() {
266                // Nothing to fetch.
267                return Ok(());
268            }
269
270            // Fetch data with ranges
271            let chunk_data = self.fetch_bytes(&fetch_ranges).await?;
272
273            // Assigns fetched data to base.
274            self.base.assign_dense_chunk(projection, chunk_data);
275        }
276
277        Ok(())
278    }
279
280    /// Try to fetch data from the memory cache or the WriteCache,
281    /// if not in WriteCache, fetch data from object store directly.
282    async fn fetch_bytes(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
283        // Now fetch page timer includes the whole time to read pages.
284        let _timer = READ_STAGE_FETCH_PAGES.start_timer();
285        let page_key = PageKey::new(self.file_id, self.row_group_idx, ranges.to_vec());
286        if let Some(pages) = self.cache_strategy.get_pages(&page_key) {
287            return Ok(pages.compressed.clone());
288        }
289
290        let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet);
291        let pages = match self.fetch_ranges_from_write_cache(key, ranges).await {
292            Some(data) => data,
293            None => {
294                // Fetch data from object store.
295                let _timer = READ_STAGE_ELAPSED
296                    .with_label_values(&["cache_miss_read"])
297                    .start_timer();
298
299                fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges)
300                    .await
301                    .map_err(|e| ParquetError::External(Box::new(e)))?
302            }
303        };
304
305        // Put pages back to the cache.
306        let total_range_size = compute_total_range_size(ranges);
307        let page_value = PageValue::new(pages.clone(), total_range_size);
308        self.cache_strategy
309            .put_pages(page_key, Arc::new(page_value));
310
311        Ok(pages)
312    }
313
314    /// Fetches data from write cache.
315    /// Returns `None` if the data is not in the cache.
316    async fn fetch_ranges_from_write_cache(
317        &self,
318        key: IndexKey,
319        ranges: &[Range<u64>],
320    ) -> Option<Vec<Bytes>> {
321        if let Some(cache) = self.cache_strategy.write_cache() {
322            return cache.file_cache().read_ranges(key, ranges).await;
323        }
324        None
325    }
326}
327
328/// Computes the max possible buffer size to read the given `ranges`.
329// See https://github.com/apache/opendal/blob/v0.54.0/core/src/types/read/reader.rs#L166-L192
330fn compute_total_range_size(ranges: &[Range<u64>]) -> u64 {
331    if ranges.is_empty() {
332        return 0;
333    }
334
335    let gap = MERGE_GAP as u64;
336    let mut sorted_ranges = ranges.to_vec();
337    sorted_ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start));
338
339    let mut total_size = 0;
340    let mut cur = sorted_ranges[0].clone();
341
342    for range in sorted_ranges.into_iter().skip(1) {
343        if range.start <= cur.end + gap {
344            // There is an overlap or the gap is small enough to merge
345            cur.end = cur.end.max(range.end);
346        } else {
347            // No overlap and the gap is too large, add current range to total and start a new one
348            total_size += align_to_pooled_buf_size(cur.end - cur.start);
349            cur = range;
350        }
351    }
352
353    // Add the last range
354    total_size += align_to_pooled_buf_size(cur.end - cur.start);
355
356    total_size
357}
358
359/// Aligns the given size to the multiple of the pooled buffer size.
360// See:
361// - https://github.com/apache/opendal/blob/v0.54.0/core/src/services/fs/backend.rs#L178
362// - https://github.com/apache/opendal/blob/v0.54.0/core/src/services/fs/reader.rs#L36-L46
363fn align_to_pooled_buf_size(size: u64) -> u64 {
364    const POOLED_BUF_SIZE: u64 = 2 * 1024 * 1024;
365    size.div_ceil(POOLED_BUF_SIZE) * POOLED_BUF_SIZE
366}
367
368impl RowGroups for InMemoryRowGroup<'_> {
369    fn num_rows(&self) -> usize {
370        self.base.row_count
371    }
372
373    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
374        // Creates a page reader to read column at `i`.
375        let page_reader = self.base.column_reader(i)?;
376
377        Ok(Box::new(ColumnChunkIterator {
378            reader: Some(Ok(Box::new(page_reader))),
379        }))
380    }
381}
382
383/// An in-memory column chunk
384#[derive(Clone)]
385pub(crate) enum ColumnChunkData {
386    /// Column chunk data representing only a subset of data pages
387    Sparse {
388        /// Length of the full column chunk
389        length: usize,
390        /// Set of data pages included in this sparse chunk. Each element is a tuple
391        /// of (page offset, page data)
392        data: Vec<(usize, Bytes)>,
393    },
394    /// Full column chunk and its offset
395    Dense { offset: usize, data: Bytes },
396}
397
398impl ColumnChunkData {
399    fn get(&self, start: u64) -> Result<Bytes> {
400        match &self {
401            ColumnChunkData::Sparse { data, .. } => data
402                .binary_search_by_key(&start, |(offset, _)| *offset as u64)
403                .map(|idx| data[idx].1.clone())
404                .map_err(|_| {
405                    ParquetError::General(format!(
406                        "Invalid offset in sparse column chunk data: {start}"
407                    ))
408                }),
409            ColumnChunkData::Dense { offset, data } => {
410                let start = start as usize - *offset;
411                Ok(data.slice(start..))
412            }
413        }
414    }
415}
416
417impl Length for ColumnChunkData {
418    fn len(&self) -> u64 {
419        match &self {
420            ColumnChunkData::Sparse { length, .. } => *length as u64,
421            ColumnChunkData::Dense { data, .. } => data.len() as u64,
422        }
423    }
424}
425
426impl ChunkReader for ColumnChunkData {
427    type T = bytes::buf::Reader<Bytes>;
428
429    fn get_read(&self, start: u64) -> Result<Self::T> {
430        Ok(self.get(start)?.reader())
431    }
432
433    fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
434        Ok(self.get(start)?.slice(..length))
435    }
436}
437
438/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
439pub(crate) struct ColumnChunkIterator {
440    pub(crate) reader: Option<Result<Box<dyn PageReader>>>,
441}
442
443impl Iterator for ColumnChunkIterator {
444    type Item = Result<Box<dyn PageReader>>;
445
446    fn next(&mut self) -> Option<Self::Item> {
447        self.reader.take()
448    }
449}
450
451impl PageIterator for ColumnChunkIterator {}