puffin/file_format/reader/
footer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::Cursor;
16
17use common_base::range_read::RangeReader;
18use snafu::{ensure, ResultExt};
19
20use crate::error::{
21    DeserializeJsonSnafu, InvalidPuffinFooterSnafu, Lz4DecompressionSnafu, MagicNotMatchedSnafu,
22    ReadSnafu, Result, UnexpectedFooterPayloadSizeSnafu,
23};
24use crate::file_format::{Flags, FLAGS_SIZE, MAGIC, MAGIC_SIZE, MIN_FILE_SIZE, PAYLOAD_SIZE_SIZE};
25use crate::file_metadata::FileMetadata;
26
27/// The default prefetch size for the footer reader.
28pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB
29
30/// Reader for the footer of a Puffin data file
31///
32/// The footer has a specific layout that needs to be read and parsed to
33/// extract metadata about the file, which is encapsulated in the [`FileMetadata`] type.
34///
35/// This reader supports prefetching, allowing for more efficient reading
36/// of the footer by fetching additional data ahead of time.
37///
38/// ```text
39/// Footer layout: HeadMagic Payload PayloadSize Flags FootMagic
40///                [4]       [?]     [4]         [4]   [4]
41/// ```
42pub struct PuffinFileFooterReader<R> {
43    /// The source of the puffin file
44    source: R,
45    /// The content length of the puffin file
46    file_size: u64,
47    /// The prefetch footer size
48    prefetch_size: Option<u64>,
49}
50
51impl<'a, R: RangeReader + 'a> PuffinFileFooterReader<R> {
52    pub fn new(source: R, content_len: u64) -> Self {
53        Self {
54            source,
55            file_size: content_len,
56            prefetch_size: None,
57        }
58    }
59
60    fn prefetch_size(&self) -> u64 {
61        self.prefetch_size.unwrap_or(MIN_FILE_SIZE)
62    }
63
64    pub fn with_prefetch_size(mut self, prefetch_size: u64) -> Self {
65        self.prefetch_size = Some(prefetch_size.max(MIN_FILE_SIZE));
66        self
67    }
68
69    pub async fn metadata(&'a mut self) -> Result<FileMetadata> {
70        // Note: prefetch > content_len is allowed, since we're using saturating_sub.
71        let footer_start = self.file_size.saturating_sub(self.prefetch_size());
72        let suffix = self
73            .source
74            .read(footer_start..self.file_size)
75            .await
76            .context(ReadSnafu)?;
77        let suffix_len = suffix.len();
78
79        // check the magic
80        let magic = Self::read_tailing_four_bytes(&suffix)?;
81        ensure!(magic == MAGIC, MagicNotMatchedSnafu);
82
83        let flags = self.decode_flags(&suffix[..suffix_len - MAGIC_SIZE as usize])?;
84        let length = self.decode_payload_size(
85            &suffix[..suffix_len - MAGIC_SIZE as usize - FLAGS_SIZE as usize],
86        )?;
87        let footer_size = PAYLOAD_SIZE_SIZE + FLAGS_SIZE + MAGIC_SIZE;
88
89        // Did not fetch the entire file metadata in the initial read, need to make a second request.
90        if length > suffix_len as u64 - footer_size {
91            let metadata_start = self.file_size - length - footer_size;
92            let meta = self
93                .source
94                .read(metadata_start..self.file_size - footer_size)
95                .await
96                .context(ReadSnafu)?;
97            self.parse_payload(&flags, &meta)
98        } else {
99            let metadata_start = self.file_size - length - footer_size - footer_start;
100            let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize];
101            self.parse_payload(&flags, meta)
102        }
103    }
104
105    fn parse_payload(&self, flags: &Flags, bytes: &[u8]) -> Result<FileMetadata> {
106        if flags.contains(Flags::FOOTER_PAYLOAD_COMPRESSED_LZ4) {
107            let decoder = lz4_flex::frame::FrameDecoder::new(Cursor::new(bytes));
108            let res = serde_json::from_reader(decoder).context(Lz4DecompressionSnafu)?;
109            Ok(res)
110        } else {
111            serde_json::from_slice(bytes).context(DeserializeJsonSnafu)
112        }
113    }
114
115    fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> {
116        let suffix_len = suffix.len();
117        ensure!(suffix_len >= 4, InvalidPuffinFooterSnafu);
118        let mut bytes = [0; 4];
119        bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]);
120
121        Ok(bytes)
122    }
123
124    fn decode_flags(&self, suffix: &[u8]) -> Result<Flags> {
125        let flags = u32::from_le_bytes(Self::read_tailing_four_bytes(suffix)?);
126        Ok(Flags::from_bits_truncate(flags))
127    }
128
129    fn decode_payload_size(&self, suffix: &[u8]) -> Result<u64> {
130        let payload_size = i32::from_le_bytes(Self::read_tailing_four_bytes(suffix)?);
131
132        ensure!(
133            payload_size >= 0,
134            UnexpectedFooterPayloadSizeSnafu { size: payload_size }
135        );
136        let payload_size = payload_size as u64;
137        ensure!(
138            payload_size <= self.file_size - MIN_FILE_SIZE,
139            UnexpectedFooterPayloadSizeSnafu {
140                size: self.file_size as i32
141            }
142        );
143
144        Ok(payload_size)
145    }
146}