puffin/partial_reader/
async.rs1use std::io;
16use std::ops::Range;
17
18use async_trait::async_trait;
19use bytes::{BufMut, Bytes};
20use common_base::range_read::{Metadata, RangeReader};
21
22use crate::partial_reader::PartialReader;
23
24#[async_trait]
25impl<R: RangeReader> RangeReader for PartialReader<R> {
26 async fn metadata(&self) -> io::Result<Metadata> {
27 Ok(Metadata {
28 content_length: self.size,
29 })
30 }
31
32 async fn read(&self, range: Range<u64>) -> io::Result<Bytes> {
33 let absolute_range_start = self.offset + range.start;
34 if absolute_range_start >= self.offset + self.size {
35 return Err(io::Error::new(
36 io::ErrorKind::UnexpectedEof,
37 "Start of range is out of bounds",
38 ));
39 }
40 let absolute_range_end = (self.offset + range.end).min(self.offset + self.size);
41 let absolute_range = absolute_range_start..absolute_range_end;
42
43 let result = self.source.read(absolute_range.clone()).await?;
44 Ok(result)
45 }
46
47 async fn read_into(&self, range: Range<u64>, buf: &mut (impl BufMut + Send)) -> io::Result<()> {
48 let absolute_range_start = self.offset + range.start;
49 if absolute_range_start >= self.offset + self.size {
50 return Err(io::Error::new(
51 io::ErrorKind::UnexpectedEof,
52 "Start of range is out of bounds",
53 ));
54 }
55 let absolute_range_end = (self.offset + range.end).min(self.offset + self.size);
56 let absolute_range = absolute_range_start..absolute_range_end;
57
58 self.source.read_into(absolute_range.clone(), buf).await?;
59 Ok(())
60 }
61
62 async fn read_vec(&self, ranges: &[Range<u64>]) -> io::Result<Vec<Bytes>> {
63 let absolute_ranges = ranges
64 .iter()
65 .map(|range| {
66 let start = self.offset + range.start;
67
68 if start >= self.offset + self.size {
69 return Err(io::Error::new(
70 io::ErrorKind::UnexpectedEof,
71 "Start of range is out of bounds",
72 ));
73 }
74
75 let end = (self.offset + range.end).min(self.offset + self.size);
76 Ok(start..end)
77 })
78 .collect::<io::Result<Vec<_>>>()?;
79
80 let results = self.source.read_vec(&absolute_ranges).await?;
81
82 Ok(results)
83 }
84}
85
86#[cfg(test)]
87mod tests {
88 use super::*;
89
90 #[tokio::test]
91 async fn read_all_data_in_portion() {
92 let data: Vec<u8> = (0..100).collect();
93 let reader = PartialReader::new(data.clone(), 0, 100);
94 let buf = reader.read(0..100).await.unwrap();
95 assert_eq!(*buf, data);
96 }
97
98 #[tokio::test]
99 async fn read_part_of_data_in_portion() {
100 let data: Vec<u8> = (0..100).collect();
101 let reader = PartialReader::new(data, 10, 30);
102 let buf = reader.read(0..30).await.unwrap();
103 assert_eq!(*buf, (10..40).collect::<Vec<u8>>());
104 }
105
106 #[tokio::test]
107 async fn seek_past_end_of_portion_returns_error() {
108 let data: Vec<u8> = (0..100).collect();
109 let reader = PartialReader::new(data, 10, 30);
110 assert!(reader.read(31..32).await.is_err());
112 }
113
114 #[tokio::test]
115 async fn is_eof_returns_true_at_end_of_portion() {
116 let data: Vec<u8> = (0..100).collect();
117 let reader = PartialReader::new(data, 10, 30);
118 let _ = reader.read(0..20).await.unwrap();
119 }
120}