Skip to main content

mito2/memtable/bulk/
row_group_reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use bytes::Bytes;
18use parquet::arrow::ProjectionMask;
19use parquet::arrow::arrow_reader::{
20    ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
21    ParquetRecordBatchReaderBuilder, RowSelection,
22};
23use parquet::file::metadata::ParquetMetaData;
24use snafu::ResultExt;
25
26use crate::error;
27use crate::error::ReadDataPartSnafu;
28use crate::memtable::bulk::chunk_reader::MemtableChunkReader;
29use crate::memtable::bulk::context::BulkIterContextRef;
30use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
31
32pub(crate) struct MemtableRowGroupReaderBuilder {
33    projection: ProjectionMask,
34    arrow_metadata: ArrowReaderMetadata,
35    data: Bytes,
36}
37
38impl MemtableRowGroupReaderBuilder {
39    pub(crate) fn try_new(
40        context: &BulkIterContextRef,
41        projection: ProjectionMask,
42        parquet_metadata: Arc<ParquetMetaData>,
43        data: Bytes,
44    ) -> error::Result<Self> {
45        // Create ArrowReaderMetadata for building the reader.
46        let arrow_reader_options =
47            ArrowReaderOptions::new().with_schema(context.read_format().arrow_schema().clone());
48        let arrow_metadata =
49            ArrowReaderMetadata::try_new(parquet_metadata.clone(), arrow_reader_options)
50                .context(ReadDataPartSnafu)?;
51        Ok(Self {
52            projection,
53            arrow_metadata,
54            data,
55        })
56    }
57
58    /// Builds a reader to read the row group at `row_group_idx` from memory.
59    pub(crate) fn build_row_group_reader(
60        &self,
61        row_group_idx: usize,
62        row_selection: Option<RowSelection>,
63    ) -> error::Result<ParquetRecordBatchReader> {
64        let chunk_reader = MemtableChunkReader::new(self.data.clone());
65
66        let mut builder = ParquetRecordBatchReaderBuilder::new_with_metadata(
67            chunk_reader,
68            self.arrow_metadata.clone(),
69        )
70        .with_row_groups(vec![row_group_idx])
71        .with_projection(self.projection.clone())
72        .with_batch_size(DEFAULT_READ_BATCH_SIZE);
73
74        if let Some(selection) = row_selection {
75            builder = builder.with_row_selection(selection);
76        }
77
78        builder.build().context(ReadDataPartSnafu)
79    }
80}