puffin/partial_reader/
async.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io;
16use std::ops::Range;
17
18use async_trait::async_trait;
19use bytes::{BufMut, Bytes};
20use common_base::range_read::{Metadata, RangeReader};
21
22use crate::partial_reader::PartialReader;
23
24#[async_trait]
25impl<R: RangeReader> RangeReader for PartialReader<R> {
26    async fn metadata(&self) -> io::Result<Metadata> {
27        Ok(Metadata {
28            content_length: self.size,
29        })
30    }
31
32    async fn read(&self, range: Range<u64>) -> io::Result<Bytes> {
33        let absolute_range_start = self.offset + range.start;
34        if absolute_range_start >= self.offset + self.size {
35            return Err(io::Error::new(
36                io::ErrorKind::UnexpectedEof,
37                "Start of range is out of bounds",
38            ));
39        }
40        let absolute_range_end = (self.offset + range.end).min(self.offset + self.size);
41        let absolute_range = absolute_range_start..absolute_range_end;
42
43        let result = self.source.read(absolute_range.clone()).await?;
44        Ok(result)
45    }
46
47    async fn read_into(&self, range: Range<u64>, buf: &mut (impl BufMut + Send)) -> io::Result<()> {
48        let absolute_range_start = self.offset + range.start;
49        if absolute_range_start >= self.offset + self.size {
50            return Err(io::Error::new(
51                io::ErrorKind::UnexpectedEof,
52                "Start of range is out of bounds",
53            ));
54        }
55        let absolute_range_end = (self.offset + range.end).min(self.offset + self.size);
56        let absolute_range = absolute_range_start..absolute_range_end;
57
58        self.source.read_into(absolute_range.clone(), buf).await?;
59        Ok(())
60    }
61
62    async fn read_vec(&self, ranges: &[Range<u64>]) -> io::Result<Vec<Bytes>> {
63        let absolute_ranges = ranges
64            .iter()
65            .map(|range| {
66                let start = self.offset + range.start;
67
68                if start >= self.offset + self.size {
69                    return Err(io::Error::new(
70                        io::ErrorKind::UnexpectedEof,
71                        "Start of range is out of bounds",
72                    ));
73                }
74
75                let end = (self.offset + range.end).min(self.offset + self.size);
76                Ok(start..end)
77            })
78            .collect::<io::Result<Vec<_>>>()?;
79
80        let results = self.source.read_vec(&absolute_ranges).await?;
81
82        Ok(results)
83    }
84}
85
86#[cfg(test)]
87mod tests {
88    use super::*;
89
90    #[tokio::test]
91    async fn read_all_data_in_portion() {
92        let data: Vec<u8> = (0..100).collect();
93        let reader = PartialReader::new(data.clone(), 0, 100);
94        let buf = reader.read(0..100).await.unwrap();
95        assert_eq!(*buf, data);
96    }
97
98    #[tokio::test]
99    async fn read_part_of_data_in_portion() {
100        let data: Vec<u8> = (0..100).collect();
101        let reader = PartialReader::new(data, 10, 30);
102        let buf = reader.read(0..30).await.unwrap();
103        assert_eq!(*buf, (10..40).collect::<Vec<u8>>());
104    }
105
106    #[tokio::test]
107    async fn seek_past_end_of_portion_returns_error() {
108        let data: Vec<u8> = (0..100).collect();
109        let reader = PartialReader::new(data, 10, 30);
110        // seeking past the portion returns an error
111        assert!(reader.read(31..32).await.is_err());
112    }
113
114    #[tokio::test]
115    async fn is_eof_returns_true_at_end_of_portion() {
116        let data: Vec<u8> = (0..100).collect();
117        let reader = PartialReader::new(data, 10, 30);
118        let _ = reader.read(0..20).await.unwrap();
119    }
120}