mito2/memtable/bulk/
row_group_reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Range;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use datatypes::arrow::array::RecordBatch;
20use datatypes::arrow::error::ArrowError;
21use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowGroups, RowSelection};
22use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
23use parquet::column::page::{PageIterator, PageReader};
24use parquet::file::metadata::ParquetMetaData;
25use snafu::ResultExt;
26
27use crate::error;
28use crate::error::ReadDataPartSnafu;
29use crate::memtable::bulk::context::BulkIterContextRef;
30use crate::sst::parquet::format::ReadFormat;
31use crate::sst::parquet::reader::{RowGroupReaderBase, RowGroupReaderContext};
32use crate::sst::parquet::row_group::{ColumnChunkIterator, RowGroupBase};
33use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
34
35/// Helper for reading specific row group inside Memtable Parquet parts.
36// This is similar to [mito2::sst::parquet::row_group::InMemoryRowGroup] since
37// it's a workaround for lacking of keyword generics.
38pub struct MemtableRowGroupPageFetcher<'a> {
39    /// Shared structs for reading row group.
40    base: RowGroupBase<'a>,
41    bytes: Bytes,
42}
43
44impl<'a> MemtableRowGroupPageFetcher<'a> {
45    pub(crate) fn create(
46        row_group_idx: usize,
47        parquet_meta: &'a ParquetMetaData,
48        bytes: Bytes,
49    ) -> Self {
50        Self {
51            // the cached `column_uncompressed_pages` would never be used in Memtable readers.
52            base: RowGroupBase::new(parquet_meta, row_group_idx),
53            bytes,
54        }
55    }
56
57    /// Fetches column pages from memory file.
58    pub(crate) fn fetch(&mut self, projection: &ProjectionMask, selection: Option<&RowSelection>) {
59        if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
60            // Selection provided.
61            let (fetch_ranges, page_start_offsets) =
62                self.base
63                    .calc_sparse_read_ranges(projection, offset_index, selection);
64            if fetch_ranges.is_empty() {
65                return;
66            }
67            let chunk_data = self.fetch_bytes(&fetch_ranges);
68
69            self.base
70                .assign_sparse_chunk(projection, chunk_data, page_start_offsets);
71        } else {
72            let fetch_ranges = self.base.calc_dense_read_ranges(projection);
73            if fetch_ranges.is_empty() {
74                // Nothing to fetch.
75                return;
76            }
77            let chunk_data = self.fetch_bytes(&fetch_ranges);
78            self.base.assign_dense_chunk(projection, chunk_data);
79        }
80    }
81
82    fn fetch_bytes(&self, ranges: &[Range<u64>]) -> Vec<Bytes> {
83        ranges
84            .iter()
85            .map(|range| self.bytes.slice(range.start as usize..range.end as usize))
86            .collect()
87    }
88
89    /// Creates a page reader to read column at `i`.
90    fn column_page_reader(&self, i: usize) -> parquet::errors::Result<Box<dyn PageReader>> {
91        let reader = self.base.column_reader(i)?;
92        Ok(Box::new(reader))
93    }
94}
95
96impl RowGroups for MemtableRowGroupPageFetcher<'_> {
97    fn num_rows(&self) -> usize {
98        self.base.row_count
99    }
100
101    fn column_chunks(&self, i: usize) -> parquet::errors::Result<Box<dyn PageIterator>> {
102        Ok(Box::new(ColumnChunkIterator {
103            reader: Some(self.column_page_reader(i)),
104        }))
105    }
106}
107
108impl RowGroupReaderContext for BulkIterContextRef {
109    fn map_result(
110        &self,
111        result: Result<Option<RecordBatch>, ArrowError>,
112    ) -> error::Result<Option<RecordBatch>> {
113        result.context(error::DecodeArrowRowGroupSnafu)
114    }
115
116    fn read_format(&self) -> &ReadFormat {
117        self.as_ref().read_format()
118    }
119}
120
121pub(crate) type MemtableRowGroupReader = RowGroupReaderBase<BulkIterContextRef>;
122
123pub(crate) struct MemtableRowGroupReaderBuilder {
124    context: BulkIterContextRef,
125    projection: ProjectionMask,
126    parquet_metadata: Arc<ParquetMetaData>,
127    field_levels: FieldLevels,
128    data: Bytes,
129}
130
131impl MemtableRowGroupReaderBuilder {
132    pub(crate) fn try_new(
133        context: BulkIterContextRef,
134        projection: ProjectionMask,
135        parquet_metadata: Arc<ParquetMetaData>,
136        data: Bytes,
137    ) -> error::Result<Self> {
138        let parquet_schema_desc = parquet_metadata.file_metadata().schema_descr();
139        let hint = Some(context.read_format().arrow_schema().fields());
140        let field_levels =
141            parquet_to_arrow_field_levels(parquet_schema_desc, projection.clone(), hint)
142                .context(ReadDataPartSnafu)?;
143        Ok(Self {
144            context,
145            projection,
146            parquet_metadata,
147            field_levels,
148            data,
149        })
150    }
151
152    /// Builds a reader to read the row group at `row_group_idx` from memory.
153    pub(crate) fn build_row_group_reader(
154        &self,
155        row_group_idx: usize,
156        row_selection: Option<RowSelection>,
157    ) -> error::Result<MemtableRowGroupReader> {
158        let mut row_group = MemtableRowGroupPageFetcher::create(
159            row_group_idx,
160            &self.parquet_metadata,
161            self.data.clone(),
162        );
163        // Fetches data from memory part. Currently, row selection is not supported.
164        row_group.fetch(&self.projection, row_selection.as_ref());
165
166        // Builds the parquet reader.
167        // Now the row selection is None.
168        let reader = ParquetRecordBatchReader::try_new_with_row_groups(
169            &self.field_levels,
170            &row_group,
171            DEFAULT_READ_BATCH_SIZE,
172            row_selection,
173        )
174        .context(ReadDataPartSnafu)?;
175        Ok(MemtableRowGroupReader::create(self.context.clone(), reader))
176    }
177}