puffin/file_format/reader/
file.rs1use async_trait::async_trait;
16use common_base::range_read::RangeReader;
17use snafu::{ensure, ResultExt};
18
19use crate::blob_metadata::BlobMetadata;
20use crate::error::{ReadSnafu, Result, UnexpectedPuffinFileSizeSnafu};
21use crate::file_format::reader::footer::DEFAULT_PREFETCH_SIZE;
22use crate::file_format::reader::{AsyncReader, PuffinFileFooterReader};
23use crate::file_format::MIN_FILE_SIZE;
24use crate::file_metadata::FileMetadata;
25use crate::partial_reader::PartialReader;
26
27pub struct PuffinFileReader<R> {
34 source: R,
36
37 metadata: Option<FileMetadata>,
39}
40
41impl<R> PuffinFileReader<R> {
42 pub fn new(source: R) -> Self {
43 Self {
44 source,
45 metadata: None,
46 }
47 }
48
49 pub fn with_metadata(mut self, metadata: Option<FileMetadata>) -> Self {
50 self.metadata = metadata;
51 self
52 }
53
54 fn validate_file_size(file_size: u64) -> Result<()> {
55 ensure!(
56 file_size >= MIN_FILE_SIZE,
57 UnexpectedPuffinFileSizeSnafu {
58 min_file_size: MIN_FILE_SIZE,
59 actual_file_size: file_size
60 }
61 );
62 Ok(())
63 }
64
65 pub fn into_blob_reader(self, blob_metadata: &BlobMetadata) -> PartialReader<R> {
67 PartialReader::new(
68 self.source,
69 blob_metadata.offset as _,
70 blob_metadata.length as _,
71 )
72 }
73}
74
75#[async_trait]
76impl<'a, R: RangeReader + 'a> AsyncReader<'a> for PuffinFileReader<R> {
77 type Reader = PartialReader<&'a R>;
78
79 async fn metadata(&'a mut self) -> Result<FileMetadata> {
80 if let Some(metadata) = &self.metadata {
81 return Ok(metadata.clone());
82 }
83 let file_size = self.get_file_size_async().await?;
84 let mut reader = PuffinFileFooterReader::new(&self.source, file_size)
85 .with_prefetch_size(DEFAULT_PREFETCH_SIZE);
86 let metadata = reader.metadata().await?;
87 self.metadata = Some(metadata.clone());
88 Ok(metadata)
89 }
90
91 fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result<Self::Reader> {
92 Ok(PartialReader::new(
93 &self.source,
94 blob_metadata.offset as _,
95 blob_metadata.length as _,
96 ))
97 }
98}
99
100impl<R: RangeReader> PuffinFileReader<R> {
101 async fn get_file_size_async(&mut self) -> Result<u64> {
102 let file_size = self
103 .source
104 .metadata()
105 .await
106 .context(ReadSnafu)?
107 .content_length;
108 Self::validate_file_size(file_size)?;
109 Ok(file_size)
110 }
111}