mito2/memtable/bulk/
chunk_reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! ChunkReader implementation for in-memory parquet bytes.
16
17use std::io::Cursor;
18
19use bytes::Bytes;
20use parquet::errors::{ParquetError, Result};
21use parquet::file::reader::{ChunkReader, Length};
22
23/// A [ChunkReader] implementation for in-memory parquet bytes.
24///
25/// This provides byte access to parquet data stored in memory (Bytes),
26/// used for reading parquet data from bulk memtable.
27#[derive(Clone)]
28pub struct MemtableChunkReader {
29    /// The in-memory parquet data.
30    data: Bytes,
31}
32
33impl MemtableChunkReader {
34    /// Creates a new [MemtableChunkReader] from the given bytes.
35    pub fn new(data: Bytes) -> Self {
36        Self { data }
37    }
38}
39
40impl Length for MemtableChunkReader {
41    fn len(&self) -> u64 {
42        self.data.len() as u64
43    }
44}
45
46impl ChunkReader for MemtableChunkReader {
47    type T = Cursor<Bytes>;
48
49    fn get_read(&self, start: u64) -> Result<Self::T> {
50        let start = start as usize;
51        if start > self.data.len() {
52            return Err(ParquetError::IndexOutOfBound(start, self.data.len()));
53        }
54        Ok(Cursor::new(self.data.slice(start..)))
55    }
56
57    fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
58        let start = start as usize;
59        let end = start + length;
60        if end > self.data.len() {
61            return Err(ParquetError::IndexOutOfBound(end, self.data.len()));
62        }
63        Ok(self.data.slice(start..end))
64    }
65}