puffin/file_format/reader/
footer.rs1use std::io::Cursor;
16
17use common_base::range_read::RangeReader;
18use snafu::{ensure, ResultExt};
19
20use crate::error::{
21 DeserializeJsonSnafu, InvalidPuffinFooterSnafu, Lz4DecompressionSnafu, MagicNotMatchedSnafu,
22 ReadSnafu, Result, UnexpectedFooterPayloadSizeSnafu,
23};
24use crate::file_format::{Flags, FLAGS_SIZE, MAGIC, MAGIC_SIZE, MIN_FILE_SIZE, PAYLOAD_SIZE_SIZE};
25use crate::file_metadata::FileMetadata;
26
27pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; pub struct PuffinFileFooterReader<R> {
43 source: R,
45 file_size: u64,
47 prefetch_size: Option<u64>,
49}
50
51impl<'a, R: RangeReader + 'a> PuffinFileFooterReader<R> {
52 pub fn new(source: R, content_len: u64) -> Self {
53 Self {
54 source,
55 file_size: content_len,
56 prefetch_size: None,
57 }
58 }
59
60 fn prefetch_size(&self) -> u64 {
61 self.prefetch_size.unwrap_or(MIN_FILE_SIZE)
62 }
63
64 pub fn with_prefetch_size(mut self, prefetch_size: u64) -> Self {
65 self.prefetch_size = Some(prefetch_size.max(MIN_FILE_SIZE));
66 self
67 }
68
69 pub async fn metadata(&'a mut self) -> Result<FileMetadata> {
70 let footer_start = self.file_size.saturating_sub(self.prefetch_size());
72 let suffix = self
73 .source
74 .read(footer_start..self.file_size)
75 .await
76 .context(ReadSnafu)?;
77 let suffix_len = suffix.len();
78
79 let magic = Self::read_tailing_four_bytes(&suffix)?;
81 ensure!(magic == MAGIC, MagicNotMatchedSnafu);
82
83 let flags = self.decode_flags(&suffix[..suffix_len - MAGIC_SIZE as usize])?;
84 let length = self.decode_payload_size(
85 &suffix[..suffix_len - MAGIC_SIZE as usize - FLAGS_SIZE as usize],
86 )?;
87 let footer_size = PAYLOAD_SIZE_SIZE + FLAGS_SIZE + MAGIC_SIZE;
88
89 if length > suffix_len as u64 - footer_size {
91 let metadata_start = self.file_size - length - footer_size;
92 let meta = self
93 .source
94 .read(metadata_start..self.file_size - footer_size)
95 .await
96 .context(ReadSnafu)?;
97 self.parse_payload(&flags, &meta)
98 } else {
99 let metadata_start = self.file_size - length - footer_size - footer_start;
100 let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize];
101 self.parse_payload(&flags, meta)
102 }
103 }
104
105 fn parse_payload(&self, flags: &Flags, bytes: &[u8]) -> Result<FileMetadata> {
106 if flags.contains(Flags::FOOTER_PAYLOAD_COMPRESSED_LZ4) {
107 let decoder = lz4_flex::frame::FrameDecoder::new(Cursor::new(bytes));
108 let res = serde_json::from_reader(decoder).context(Lz4DecompressionSnafu)?;
109 Ok(res)
110 } else {
111 serde_json::from_slice(bytes).context(DeserializeJsonSnafu)
112 }
113 }
114
115 fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> {
116 let suffix_len = suffix.len();
117 ensure!(suffix_len >= 4, InvalidPuffinFooterSnafu);
118 let mut bytes = [0; 4];
119 bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]);
120
121 Ok(bytes)
122 }
123
124 fn decode_flags(&self, suffix: &[u8]) -> Result<Flags> {
125 let flags = u32::from_le_bytes(Self::read_tailing_four_bytes(suffix)?);
126 Ok(Flags::from_bits_truncate(flags))
127 }
128
129 fn decode_payload_size(&self, suffix: &[u8]) -> Result<u64> {
130 let payload_size = i32::from_le_bytes(Self::read_tailing_four_bytes(suffix)?);
131
132 ensure!(
133 payload_size >= 0,
134 UnexpectedFooterPayloadSizeSnafu { size: payload_size }
135 );
136 let payload_size = payload_size as u64;
137 ensure!(
138 payload_size <= self.file_size - MIN_FILE_SIZE,
139 UnexpectedFooterPayloadSizeSnafu {
140 size: self.file_size as i32
141 }
142 );
143
144 Ok(payload_size)
145 }
146}