puffin/file_format/reader/
file.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use common_base::range_read::RangeReader;
17use snafu::{ensure, ResultExt};
18
19use crate::blob_metadata::BlobMetadata;
20use crate::error::{ReadSnafu, Result, UnexpectedPuffinFileSizeSnafu};
21use crate::file_format::reader::footer::DEFAULT_PREFETCH_SIZE;
22use crate::file_format::reader::{AsyncReader, PuffinFileFooterReader};
23use crate::file_format::MIN_FILE_SIZE;
24use crate::file_metadata::FileMetadata;
25use crate::partial_reader::PartialReader;
26
27/// Puffin file reader, implemented [`PuffinSyncReader`] and [`PuffinAsyncReader`]
28///
29/// ```text
30/// File layout: Magic Blob₁ Blob₂ ... Blobₙ Footer
31///              [4]   [?]   [?]       [?]   [?]
32/// ```
33pub struct PuffinFileReader<R> {
34    /// The source of the puffin file
35    source: R,
36
37    /// The metadata of the puffin file, which is parsed from the footer
38    metadata: Option<FileMetadata>,
39}
40
41impl<R> PuffinFileReader<R> {
42    pub fn new(source: R) -> Self {
43        Self {
44            source,
45            metadata: None,
46        }
47    }
48
49    pub fn with_metadata(mut self, metadata: Option<FileMetadata>) -> Self {
50        self.metadata = metadata;
51        self
52    }
53
54    fn validate_file_size(file_size: u64) -> Result<()> {
55        ensure!(
56            file_size >= MIN_FILE_SIZE,
57            UnexpectedPuffinFileSizeSnafu {
58                min_file_size: MIN_FILE_SIZE,
59                actual_file_size: file_size
60            }
61        );
62        Ok(())
63    }
64
65    /// Converts the reader into an owned blob reader.
66    pub fn into_blob_reader(self, blob_metadata: &BlobMetadata) -> PartialReader<R> {
67        PartialReader::new(
68            self.source,
69            blob_metadata.offset as _,
70            blob_metadata.length as _,
71        )
72    }
73}
74
75#[async_trait]
76impl<'a, R: RangeReader + 'a> AsyncReader<'a> for PuffinFileReader<R> {
77    type Reader = PartialReader<&'a R>;
78
79    async fn metadata(&'a mut self) -> Result<FileMetadata> {
80        if let Some(metadata) = &self.metadata {
81            return Ok(metadata.clone());
82        }
83        let file_size = self.get_file_size_async().await?;
84        let mut reader = PuffinFileFooterReader::new(&self.source, file_size)
85            .with_prefetch_size(DEFAULT_PREFETCH_SIZE);
86        let metadata = reader.metadata().await?;
87        self.metadata = Some(metadata.clone());
88        Ok(metadata)
89    }
90
91    fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result<Self::Reader> {
92        Ok(PartialReader::new(
93            &self.source,
94            blob_metadata.offset as _,
95            blob_metadata.length as _,
96        ))
97    }
98}
99
100impl<R: RangeReader> PuffinFileReader<R> {
101    async fn get_file_size_async(&mut self) -> Result<u64> {
102        let file_size = self
103            .source
104            .metadata()
105            .await
106            .context(ReadSnafu)?
107            .content_length;
108        Self::validate_file_size(file_size)?;
109        Ok(file_size)
110    }
111}