mito2/sst/parquet/
row_group.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Ports private structs from [parquet crate](https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/arrow/async_reader/mod.rs#L644-L650).
16
17use std::ops::Range;
18use std::sync::Arc;
19
20use bytes::{Buf, Bytes};
21use object_store::ObjectStore;
22use parquet::arrow::arrow_reader::{RowGroups, RowSelection};
23use parquet::arrow::ProjectionMask;
24use parquet::column::page::{PageIterator, PageReader};
25use parquet::errors::{ParquetError, Result};
26use parquet::file::metadata::{ColumnChunkMetaData, ParquetMetaData, RowGroupMetaData};
27use parquet::file::page_index::offset_index::OffsetIndexMetaData;
28use parquet::file::properties::DEFAULT_PAGE_SIZE;
29use parquet::file::reader::{ChunkReader, Length};
30use parquet::file::serialized_reader::SerializedPageReader;
31use store_api::storage::RegionId;
32use tokio::task::yield_now;
33
34use crate::cache::file_cache::{FileType, IndexKey};
35use crate::cache::{CacheStrategy, PageKey, PageValue};
36use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
37use crate::sst::file::FileId;
38use crate::sst::parquet::helper::fetch_byte_ranges;
39use crate::sst::parquet::page_reader::RowGroupCachedReader;
40
41pub(crate) struct RowGroupBase<'a> {
42    metadata: &'a RowGroupMetaData,
43    pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
44    /// Compressed page of each column.
45    column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
46    pub(crate) row_count: usize,
47    /// Row group level cached pages for each column.
48    ///
49    /// These pages are uncompressed pages of a row group.
50    /// `column_uncompressed_pages.len()` equals to `column_chunks.len()`.
51    column_uncompressed_pages: Vec<Option<Arc<PageValue>>>,
52}
53
54impl<'a> RowGroupBase<'a> {
55    pub(crate) fn new(parquet_meta: &'a ParquetMetaData, row_group_idx: usize) -> Self {
56        let metadata = parquet_meta.row_group(row_group_idx);
57        // `offset_index` is always `None` if we don't set
58        // [with_page_index()](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index)
59        // to `true`.
60        let offset_index = parquet_meta
61            .offset_index()
62            // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
63            .filter(|index| !index.is_empty())
64            .map(|x| x[row_group_idx].as_slice());
65
66        Self {
67            metadata,
68            offset_index,
69            column_chunks: vec![None; metadata.columns().len()],
70            row_count: metadata.num_rows() as usize,
71            column_uncompressed_pages: vec![None; metadata.columns().len()],
72        }
73    }
74
75    pub(crate) fn calc_sparse_read_ranges(
76        &self,
77        projection: &ProjectionMask,
78        offset_index: &[OffsetIndexMetaData],
79        selection: &RowSelection,
80    ) -> (Vec<Range<u64>>, Vec<Vec<usize>>) {
81        // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
82        // `RowSelection`
83        let mut page_start_offsets: Vec<Vec<usize>> = vec![];
84        let ranges = self
85            .column_chunks
86            .iter()
87            .zip(self.metadata.columns())
88            .enumerate()
89            .filter(|&(idx, (chunk, _chunk_meta))| chunk.is_none() && projection.leaf_included(idx))
90            .flat_map(|(idx, (_chunk, chunk_meta))| {
91                // If the first page does not start at the beginning of the column,
92                // then we need to also fetch a dictionary page.
93                let mut ranges = vec![];
94                let (start, _len) = chunk_meta.byte_range();
95                match offset_index[idx].page_locations.first() {
96                    Some(first) if first.offset as u64 != start => {
97                        ranges.push(start..first.offset as u64);
98                    }
99                    _ => (),
100                }
101
102                ranges.extend(
103                    selection
104                        .scan_ranges(&offset_index[idx].page_locations)
105                        .iter()
106                        .map(|range| range.start as u64..range.end as u64),
107                );
108                page_start_offsets.push(ranges.iter().map(|range| range.start as usize).collect());
109
110                ranges
111            })
112            .collect::<Vec<_>>();
113        (ranges, page_start_offsets)
114    }
115
116    pub(crate) fn assign_sparse_chunk(
117        &mut self,
118        projection: &ProjectionMask,
119        data: Vec<Bytes>,
120        page_start_offsets: Vec<Vec<usize>>,
121    ) {
122        let mut page_start_offsets = page_start_offsets.into_iter();
123        let mut chunk_data = data.into_iter();
124
125        for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
126            if chunk.is_some() || !projection.leaf_included(idx) {
127                continue;
128            }
129
130            if let Some(offsets) = page_start_offsets.next() {
131                let mut chunks = Vec::with_capacity(offsets.len());
132                for _ in 0..offsets.len() {
133                    chunks.push(chunk_data.next().unwrap());
134                }
135
136                *chunk = Some(Arc::new(ColumnChunkData::Sparse {
137                    length: self.metadata.column(idx).byte_range().1 as usize,
138                    data: offsets.into_iter().zip(chunks).collect(),
139                }))
140            }
141        }
142    }
143
144    pub(crate) fn calc_dense_read_ranges(&self, projection: &ProjectionMask) -> Vec<Range<u64>> {
145        self.column_chunks
146            .iter()
147            .zip(&self.column_uncompressed_pages)
148            .enumerate()
149            .filter(|&(idx, (chunk, uncompressed_pages))| {
150                // Don't need to fetch column data if we already cache the column's pages.
151                chunk.is_none() && projection.leaf_included(idx) && uncompressed_pages.is_none()
152            })
153            .map(|(idx, (_chunk, _pages))| {
154                let column = self.metadata.column(idx);
155                let (start, length) = column.byte_range();
156                start..(start + length)
157            })
158            .collect::<Vec<_>>()
159    }
160
161    /// Assigns uncompressed chunk binary data to [RowGroupBase::column_chunks]
162    /// and returns the chunk offset and binary data assigned.
163    pub(crate) fn assign_dense_chunk(
164        &mut self,
165        projection: &ProjectionMask,
166        chunk_data: Vec<Bytes>,
167    ) -> Vec<(usize, Bytes)> {
168        let mut chunk_data = chunk_data.into_iter();
169        let mut res = vec![];
170
171        for (idx, (chunk, row_group_pages)) in self
172            .column_chunks
173            .iter_mut()
174            .zip(&self.column_uncompressed_pages)
175            .enumerate()
176        {
177            if chunk.is_some() || !projection.leaf_included(idx) || row_group_pages.is_some() {
178                continue;
179            }
180
181            // Get the fetched page.
182            let Some(data) = chunk_data.next() else {
183                continue;
184            };
185
186            let column = self.metadata.column(idx);
187            res.push((idx, data.clone()));
188            *chunk = Some(Arc::new(ColumnChunkData::Dense {
189                offset: column.byte_range().0 as usize,
190                data,
191            }));
192        }
193        res
194    }
195
196    /// Create [PageReader] from [RowGroupBase::column_chunks]
197    pub(crate) fn column_reader(
198        &self,
199        col_idx: usize,
200    ) -> Result<SerializedPageReader<ColumnChunkData>> {
201        let page_reader = match &self.column_chunks[col_idx] {
202            None => {
203                return Err(ParquetError::General(format!(
204                    "Invalid column index {col_idx}, column was not fetched"
205                )))
206            }
207            Some(data) => {
208                let page_locations = self
209                    .offset_index
210                    // filter out empty offset indexes (old versions specified Some(vec![]) when no present)
211                    .filter(|index| !index.is_empty())
212                    .map(|index| index[col_idx].page_locations.clone());
213                SerializedPageReader::new(
214                    data.clone(),
215                    self.metadata.column(col_idx),
216                    self.row_count,
217                    page_locations,
218                )?
219            }
220        };
221
222        // This column don't cache uncompressed pages.
223        Ok(page_reader)
224    }
225}
226
227/// An in-memory collection of column chunks
228pub struct InMemoryRowGroup<'a> {
229    region_id: RegionId,
230    file_id: FileId,
231    row_group_idx: usize,
232    cache_strategy: CacheStrategy,
233    file_path: &'a str,
234    /// Object store.
235    object_store: ObjectStore,
236    base: RowGroupBase<'a>,
237}
238
239impl<'a> InMemoryRowGroup<'a> {
240    /// Creates a new [InMemoryRowGroup] by `row_group_idx`.
241    ///
242    /// # Panics
243    /// Panics if the `row_group_idx` is invalid.
244    pub fn create(
245        region_id: RegionId,
246        file_id: FileId,
247        parquet_meta: &'a ParquetMetaData,
248        row_group_idx: usize,
249        cache_strategy: CacheStrategy,
250        file_path: &'a str,
251        object_store: ObjectStore,
252    ) -> Self {
253        Self {
254            region_id,
255            file_id,
256            row_group_idx,
257            cache_strategy,
258            file_path,
259            object_store,
260            base: RowGroupBase::new(parquet_meta, row_group_idx),
261        }
262    }
263
264    /// Fetches the necessary column data into memory
265    pub async fn fetch(
266        &mut self,
267        projection: &ProjectionMask,
268        selection: Option<&RowSelection>,
269    ) -> Result<()> {
270        if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
271            let (fetch_ranges, page_start_offsets) =
272                self.base
273                    .calc_sparse_read_ranges(projection, offset_index, selection);
274
275            let chunk_data = self.fetch_bytes(&fetch_ranges).await?;
276            // Assign sparse chunk data to base.
277            self.base
278                .assign_sparse_chunk(projection, chunk_data, page_start_offsets);
279        } else {
280            // Now we only use cache in dense chunk data.
281            self.fetch_pages_from_cache(projection);
282
283            // Release the CPU to avoid blocking the runtime. Since `fetch_pages_from_cache`
284            // is a synchronous, CPU-bound operation.
285            yield_now().await;
286
287            // Calculate ranges to read.
288            let fetch_ranges = self.base.calc_dense_read_ranges(projection);
289
290            if fetch_ranges.is_empty() {
291                // Nothing to fetch.
292                return Ok(());
293            }
294
295            // Fetch data with ranges
296            let chunk_data = self.fetch_bytes(&fetch_ranges).await?;
297
298            // Assigns fetched data to base.
299            let assigned_columns = self.base.assign_dense_chunk(projection, chunk_data);
300
301            // Put fetched data to cache if necessary.
302            for (col_idx, data) in assigned_columns {
303                let column = self.base.metadata.column(col_idx);
304                if !cache_uncompressed_pages(column) {
305                    // For columns that have multiple uncompressed pages, we only cache the compressed page
306                    // to save memory.
307                    let page_key = PageKey::new_compressed(
308                        self.region_id,
309                        self.file_id,
310                        self.row_group_idx,
311                        col_idx,
312                    );
313                    self.cache_strategy
314                        .put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone())));
315                }
316            }
317        }
318
319        Ok(())
320    }
321
322    /// Fetches pages for columns if cache is enabled.
323    /// If the page is in the cache, sets the column chunk or `column_uncompressed_pages` for the column.
324    fn fetch_pages_from_cache(&mut self, projection: &ProjectionMask) {
325        let _timer = READ_STAGE_FETCH_PAGES.start_timer();
326        self.base
327            .column_chunks
328            .iter_mut()
329            .enumerate()
330            .filter(|(idx, chunk)| chunk.is_none() && projection.leaf_included(*idx))
331            .for_each(|(idx, chunk)| {
332                let column = self.base.metadata.column(idx);
333                if cache_uncompressed_pages(column) {
334                    // Fetches uncompressed pages for the row group.
335                    let page_key = PageKey::new_uncompressed(
336                        self.region_id,
337                        self.file_id,
338                        self.row_group_idx,
339                        idx,
340                    );
341                    self.base.column_uncompressed_pages[idx] =
342                        self.cache_strategy.get_pages(&page_key);
343                } else {
344                    // Fetches the compressed page from the cache.
345                    let page_key = PageKey::new_compressed(
346                        self.region_id,
347                        self.file_id,
348                        self.row_group_idx,
349                        idx,
350                    );
351
352                    *chunk = self.cache_strategy.get_pages(&page_key).map(|page_value| {
353                        Arc::new(ColumnChunkData::Dense {
354                            offset: column.byte_range().0 as usize,
355                            data: page_value.compressed.clone(),
356                        })
357                    });
358                }
359            });
360    }
361
362    /// Try to fetch data from WriteCache,
363    /// if not in WriteCache, fetch data from object store directly.
364    async fn fetch_bytes(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
365        let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet);
366        match self.fetch_ranges_from_write_cache(key, ranges).await {
367            Some(data) => Ok(data),
368            None => {
369                // Fetch data from object store.
370                let _timer = READ_STAGE_ELAPSED
371                    .with_label_values(&["cache_miss_read"])
372                    .start_timer();
373                let data = fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges)
374                    .await
375                    .map_err(|e| ParquetError::External(Box::new(e)))?;
376                Ok(data)
377            }
378        }
379    }
380
381    /// Fetches data from write cache.
382    /// Returns `None` if the data is not in the cache.
383    async fn fetch_ranges_from_write_cache(
384        &self,
385        key: IndexKey,
386        ranges: &[Range<u64>],
387    ) -> Option<Vec<Bytes>> {
388        if let Some(cache) = self.cache_strategy.write_cache() {
389            return cache.file_cache().read_ranges(key, ranges).await;
390        }
391        None
392    }
393
394    /// Creates a page reader to read column at `i`.
395    fn column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
396        if let Some(cached_pages) = &self.base.column_uncompressed_pages[i] {
397            debug_assert!(!cached_pages.row_group.is_empty());
398            // Hits the row group level page cache.
399            return Ok(Box::new(RowGroupCachedReader::new(&cached_pages.row_group)));
400        }
401
402        let page_reader = self.base.column_reader(i)?;
403
404        let column = self.base.metadata.column(i);
405        if cache_uncompressed_pages(column) {
406            // This column use row group level page cache.
407            // We collect all pages and put them into the cache.
408            let pages = page_reader.collect::<Result<Vec<_>>>()?;
409            let page_value = Arc::new(PageValue::new_row_group(pages));
410            let page_key =
411                PageKey::new_uncompressed(self.region_id, self.file_id, self.row_group_idx, i);
412            self.cache_strategy.put_pages(page_key, page_value.clone());
413
414            return Ok(Box::new(RowGroupCachedReader::new(&page_value.row_group)));
415        }
416
417        // This column don't cache uncompressed pages.
418        Ok(Box::new(page_reader))
419    }
420}
421
422/// Returns whether we cache uncompressed pages for the column.
423fn cache_uncompressed_pages(column: &ColumnChunkMetaData) -> bool {
424    // If the row group only has a data page, cache the whole row group as
425    // it might be faster than caching a compressed page.
426    column.uncompressed_size() as usize <= DEFAULT_PAGE_SIZE
427}
428
429impl RowGroups for InMemoryRowGroup<'_> {
430    fn num_rows(&self) -> usize {
431        self.base.row_count
432    }
433
434    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
435        let page_reader = self.column_page_reader(i)?;
436
437        Ok(Box::new(ColumnChunkIterator {
438            reader: Some(Ok(page_reader)),
439        }))
440    }
441}
442
443/// An in-memory column chunk
444#[derive(Clone)]
445pub(crate) enum ColumnChunkData {
446    /// Column chunk data representing only a subset of data pages
447    Sparse {
448        /// Length of the full column chunk
449        length: usize,
450        /// Set of data pages included in this sparse chunk. Each element is a tuple
451        /// of (page offset, page data)
452        data: Vec<(usize, Bytes)>,
453    },
454    /// Full column chunk and its offset
455    Dense { offset: usize, data: Bytes },
456}
457
458impl ColumnChunkData {
459    fn get(&self, start: u64) -> Result<Bytes> {
460        match &self {
461            ColumnChunkData::Sparse { data, .. } => data
462                .binary_search_by_key(&start, |(offset, _)| *offset as u64)
463                .map(|idx| data[idx].1.clone())
464                .map_err(|_| {
465                    ParquetError::General(format!(
466                        "Invalid offset in sparse column chunk data: {start}"
467                    ))
468                }),
469            ColumnChunkData::Dense { offset, data } => {
470                let start = start as usize - *offset;
471                Ok(data.slice(start..))
472            }
473        }
474    }
475}
476
477impl Length for ColumnChunkData {
478    fn len(&self) -> u64 {
479        match &self {
480            ColumnChunkData::Sparse { length, .. } => *length as u64,
481            ColumnChunkData::Dense { data, .. } => data.len() as u64,
482        }
483    }
484}
485
486impl ChunkReader for ColumnChunkData {
487    type T = bytes::buf::Reader<Bytes>;
488
489    fn get_read(&self, start: u64) -> Result<Self::T> {
490        Ok(self.get(start)?.reader())
491    }
492
493    fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
494        Ok(self.get(start)?.slice(..length))
495    }
496}
497
498/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
499pub(crate) struct ColumnChunkIterator {
500    pub(crate) reader: Option<Result<Box<dyn PageReader>>>,
501}
502
503impl Iterator for ColumnChunkIterator {
504    type Item = Result<Box<dyn PageReader>>;
505
506    fn next(&mut self) -> Option<Self::Item> {
507        self.reader.take()
508    }
509}
510
511impl PageIterator for ColumnChunkIterator {}