mito2/memtable/bulk/
row_group_reader.rs1use std::sync::Arc;
16
17use bytes::Bytes;
18use parquet::arrow::ProjectionMask;
19use parquet::arrow::arrow_reader::{
20 ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
21 ParquetRecordBatchReaderBuilder, RowSelection,
22};
23use parquet::file::metadata::ParquetMetaData;
24use snafu::ResultExt;
25
26use crate::error;
27use crate::error::ReadDataPartSnafu;
28use crate::memtable::bulk::chunk_reader::MemtableChunkReader;
29use crate::memtable::bulk::context::BulkIterContextRef;
30use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
31
32pub(crate) struct MemtableRowGroupReaderBuilder {
33 projection: ProjectionMask,
34 arrow_metadata: ArrowReaderMetadata,
35 data: Bytes,
36}
37
38impl MemtableRowGroupReaderBuilder {
39 pub(crate) fn try_new(
40 context: &BulkIterContextRef,
41 projection: ProjectionMask,
42 parquet_metadata: Arc<ParquetMetaData>,
43 data: Bytes,
44 ) -> error::Result<Self> {
45 let arrow_reader_options =
47 ArrowReaderOptions::new().with_schema(context.read_format().arrow_schema().clone());
48 let arrow_metadata =
49 ArrowReaderMetadata::try_new(parquet_metadata.clone(), arrow_reader_options)
50 .context(ReadDataPartSnafu)?;
51 Ok(Self {
52 projection,
53 arrow_metadata,
54 data,
55 })
56 }
57
58 pub(crate) fn build_row_group_reader(
60 &self,
61 row_group_idx: usize,
62 row_selection: Option<RowSelection>,
63 ) -> error::Result<ParquetRecordBatchReader> {
64 let chunk_reader = MemtableChunkReader::new(self.data.clone());
65
66 let mut builder = ParquetRecordBatchReaderBuilder::new_with_metadata(
67 chunk_reader,
68 self.arrow_metadata.clone(),
69 )
70 .with_row_groups(vec![row_group_idx])
71 .with_projection(self.projection.clone())
72 .with_batch_size(DEFAULT_READ_BATCH_SIZE);
73
74 if let Some(selection) = row_selection {
75 builder = builder.with_row_selection(selection);
76 }
77
78 builder.build().context(ReadDataPartSnafu)
79 }
80}