puffin/puffin_manager/fs_puffin_manager/
reader.rs1use std::io;
16use std::ops::Range;
17use std::sync::Arc;
18
19use async_compression::futures::bufread::ZstdDecoder;
20use async_trait::async_trait;
21use bytes::{BufMut, Bytes};
22use common_base::range_read::{AsyncReadAdapter, Metadata, RangeReader, SizeAwareRangeReader};
23use futures::io::BufReader;
24use futures::{AsyncRead, AsyncWrite};
25use snafu::{ensure, OptionExt, ResultExt};
26
27use crate::blob_metadata::{BlobMetadata, CompressionCodec};
28use crate::error::{
29 BlobIndexOutOfBoundSnafu, BlobNotFoundSnafu, DeserializeJsonSnafu, FileKeyNotMatchSnafu,
30 MetadataSnafu, ReadSnafu, Result, UnsupportedDecompressionSnafu, WriteSnafu,
31};
32use crate::file_format::reader::{AsyncReader, PuffinFileReader};
33use crate::file_metadata::FileMetadata;
34use crate::partial_reader::PartialReader;
35use crate::puffin_manager::file_accessor::PuffinFileAccessor;
36use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata;
37use crate::puffin_manager::fs_puffin_manager::PuffinMetadataCacheRef;
38use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager};
39use crate::puffin_manager::{BlobGuard, GuardWithMetadata, PuffinReader};
40
41pub struct FsPuffinReader<S, F>
43where
44 S: Stager + 'static,
45 F: PuffinFileAccessor + Clone,
46{
47 handle: F::FileHandle,
49
50 file_size_hint: Option<u64>,
52
53 stager: S,
55
56 puffin_file_accessor: F,
58
59 puffin_file_metadata_cache: Option<PuffinMetadataCacheRef>,
61}
62
63impl<S, F> FsPuffinReader<S, F>
64where
65 S: Stager + 'static,
66 F: PuffinFileAccessor + Clone,
67{
68 pub(crate) fn new(
69 handle: F::FileHandle,
70 stager: S,
71 puffin_file_accessor: F,
72 puffin_file_metadata_cache: Option<PuffinMetadataCacheRef>,
73 ) -> Self {
74 Self {
75 handle,
76 file_size_hint: None,
77 stager,
78 puffin_file_accessor,
79 puffin_file_metadata_cache,
80 }
81 }
82}
83
84#[async_trait]
85impl<S, F> PuffinReader for FsPuffinReader<S, F>
86where
87 F: PuffinFileAccessor + Clone,
88 S: Stager<FileHandle = F::FileHandle> + 'static,
89{
90 type Blob = Either<RandomReadBlob<F>, S::Blob>;
91 type Dir = S::Dir;
92
93 fn with_file_size_hint(mut self, file_size_hint: Option<u64>) -> Self {
94 self.file_size_hint = file_size_hint;
95 self
96 }
97
98 async fn metadata(&self) -> Result<Arc<FileMetadata>> {
99 let mut file = self.puffin_reader().await?;
100 self.get_puffin_file_metadata(&mut file).await
101 }
102
103 async fn blob(&self, key: &str) -> Result<GuardWithMetadata<Self::Blob>> {
104 let mut file = self.puffin_reader().await?;
105 let blob_metadata = self.get_blob_metadata(key, &mut file).await?;
106 let blob = if blob_metadata.compression_codec.is_none() {
107 Either::L(RandomReadBlob {
109 handle: self.handle.clone(),
110 accessor: self.puffin_file_accessor.clone(),
111 blob_metadata: blob_metadata.clone(),
112 })
113 } else {
114 let blob_metadata = blob_metadata.clone();
116 let staged_blob = self
117 .stager
118 .get_blob(
119 &self.handle,
120 key,
121 Box::new(|writer| {
122 Box::pin(Self::init_blob_to_stager(file, blob_metadata, writer))
123 }),
124 )
125 .await?;
126
127 Either::R(staged_blob)
128 };
129
130 Ok(GuardWithMetadata::new(blob, blob_metadata))
131 }
132
133 async fn dir(&self, key: &str) -> Result<GuardWithMetadata<Self::Dir>> {
134 let mut file = self.puffin_reader().await?;
135 let blob_metadata = self.get_blob_metadata(key, &mut file).await?;
136 let dir = self
137 .stager
138 .get_dir(
139 &self.handle,
140 key,
141 Box::new(|writer_provider| {
142 let accessor = self.puffin_file_accessor.clone();
143 let handle = self.handle.clone();
144 let blob_metadata = blob_metadata.clone();
145 Box::pin(Self::init_dir_to_stager(
146 file,
147 blob_metadata,
148 handle,
149 writer_provider,
150 accessor,
151 ))
152 }),
153 )
154 .await?;
155
156 Ok(GuardWithMetadata::new(dir, blob_metadata))
157 }
158}
159
160impl<S, F> FsPuffinReader<S, F>
161where
162 S: Stager,
163 F: PuffinFileAccessor + Clone,
164{
165 async fn get_puffin_file_metadata(
166 &self,
167 reader: &mut PuffinFileReader<F::Reader>,
168 ) -> Result<Arc<FileMetadata>> {
169 let id = self.handle.to_string();
170 if let Some(cache) = self.puffin_file_metadata_cache.as_ref() {
171 if let Some(metadata) = cache.get_metadata(&id) {
172 return Ok(metadata);
173 }
174 }
175
176 let metadata = Arc::new(reader.metadata().await?);
177 if let Some(cache) = self.puffin_file_metadata_cache.as_ref() {
178 cache.put_metadata(id, metadata.clone());
179 }
180 Ok(metadata)
181 }
182
183 async fn get_blob_metadata(
184 &self,
185 key: &str,
186 file: &mut PuffinFileReader<F::Reader>,
187 ) -> Result<BlobMetadata> {
188 let metadata = self.get_puffin_file_metadata(file).await?;
189 let blob_metadata = metadata
190 .blobs
191 .iter()
192 .find(|m| m.blob_type == key)
193 .context(BlobNotFoundSnafu { blob: key })?
194 .clone();
195
196 Ok(blob_metadata)
197 }
198
199 async fn puffin_reader(&self) -> Result<PuffinFileReader<F::Reader>> {
200 let mut reader = self.puffin_file_accessor.reader(&self.handle).await?;
201 if let Some(file_size_hint) = self.file_size_hint {
202 reader.with_file_size_hint(file_size_hint);
203 }
204 Ok(PuffinFileReader::new(reader))
205 }
206
207 async fn init_blob_to_stager(
208 reader: PuffinFileReader<F::Reader>,
209 blob_metadata: BlobMetadata,
210 mut writer: BoxWriter,
211 ) -> Result<u64> {
212 let reader = reader.into_blob_reader(&blob_metadata);
213 let reader = AsyncReadAdapter::new(reader).await.context(MetadataSnafu)?;
214 let compression = blob_metadata.compression_codec;
215 let size = Self::handle_decompress(reader, &mut writer, compression).await?;
216 Ok(size)
217 }
218
219 async fn init_dir_to_stager(
220 mut file: PuffinFileReader<F::Reader>,
221 blob_metadata: BlobMetadata,
222 handle: F::FileHandle,
223 writer_provider: DirWriterProviderRef,
224 accessor: F,
225 ) -> Result<u64> {
226 let puffin_metadata = file.metadata().await?;
227 let reader = file.blob_reader(&blob_metadata)?;
228 let meta = reader.metadata().await.context(MetadataSnafu)?;
229 let buf = reader
230 .read(0..meta.content_length)
231 .await
232 .context(ReadSnafu)?;
233 let dir_meta: DirMetadata = serde_json::from_slice(&buf).context(DeserializeJsonSnafu)?;
234
235 let mut tasks = vec![];
236 for file_meta in dir_meta.files {
237 let blob_meta = puffin_metadata
238 .blobs
239 .get(file_meta.blob_index)
240 .context(BlobIndexOutOfBoundSnafu {
241 index: file_meta.blob_index,
242 max_index: puffin_metadata.blobs.len(),
243 })?
244 .clone();
245 ensure!(
246 blob_meta.blob_type == file_meta.key,
247 FileKeyNotMatchSnafu {
248 expected: file_meta.key,
249 actual: &blob_meta.blob_type,
250 }
251 );
252
253 let reader = accessor.reader(&handle).await?;
254 let writer = writer_provider.writer(&file_meta.relative_path).await?;
255 let task = common_runtime::spawn_global(async move {
256 let reader = PuffinFileReader::new(reader).into_blob_reader(&blob_meta);
257 let reader = AsyncReadAdapter::new(reader).await.context(MetadataSnafu)?;
258 let compression = blob_meta.compression_codec;
259 let size = Self::handle_decompress(reader, writer, compression).await?;
260 Ok(size)
261 });
262 tasks.push(task);
263 }
264
265 let size = futures::future::try_join_all(tasks.into_iter())
266 .await
267 .into_iter()
268 .flatten()
269 .sum::<Result<_>>()?;
270
271 Ok(size)
272 }
273
274 async fn handle_decompress(
277 reader: impl AsyncRead,
278 mut writer: impl AsyncWrite + Unpin,
279 compression: Option<CompressionCodec>,
280 ) -> Result<u64> {
281 match compression {
282 Some(CompressionCodec::Lz4) => UnsupportedDecompressionSnafu {
283 decompression: "lz4",
284 }
285 .fail(),
286 Some(CompressionCodec::Zstd) => {
287 let reader = ZstdDecoder::new(BufReader::new(reader));
288 futures::io::copy(reader, &mut writer)
289 .await
290 .context(WriteSnafu)
291 }
292 None => futures::io::copy(reader, &mut writer)
293 .await
294 .context(WriteSnafu),
295 }
296 }
297}
298
299pub struct RandomReadBlob<F: PuffinFileAccessor> {
301 handle: F::FileHandle,
302 accessor: F,
303 blob_metadata: BlobMetadata,
304}
305
306#[async_trait]
307impl<F: PuffinFileAccessor + Clone> BlobGuard for RandomReadBlob<F> {
308 type Reader = PartialReader<F::Reader>;
309
310 async fn reader(&self) -> Result<Self::Reader> {
311 ensure!(
312 self.blob_metadata.compression_codec.is_none(),
313 UnsupportedDecompressionSnafu {
314 decompression: self.blob_metadata.compression_codec.unwrap().to_string()
315 }
316 );
317
318 let reader = self.accessor.reader(&self.handle).await?;
319 let blob_reader = PuffinFileReader::new(reader).into_blob_reader(&self.blob_metadata);
320 Ok(blob_reader)
321 }
322}
323
324pub enum Either<A, B> {
330 L(A),
331 R(B),
332}
333
334#[async_trait]
335impl<A, B> RangeReader for Either<A, B>
336where
337 A: RangeReader,
338 B: RangeReader,
339{
340 async fn metadata(&self) -> io::Result<Metadata> {
341 match self {
342 Either::L(a) => a.metadata().await,
343 Either::R(b) => b.metadata().await,
344 }
345 }
346 async fn read(&self, range: Range<u64>) -> io::Result<Bytes> {
347 match self {
348 Either::L(a) => a.read(range).await,
349 Either::R(b) => b.read(range).await,
350 }
351 }
352 async fn read_into(&self, range: Range<u64>, buf: &mut (impl BufMut + Send)) -> io::Result<()> {
353 match self {
354 Either::L(a) => a.read_into(range, buf).await,
355 Either::R(b) => b.read_into(range, buf).await,
356 }
357 }
358 async fn read_vec(&self, ranges: &[Range<u64>]) -> io::Result<Vec<Bytes>> {
359 match self {
360 Either::L(a) => a.read_vec(ranges).await,
361 Either::R(b) => b.read_vec(ranges).await,
362 }
363 }
364}
365
366#[async_trait]
367impl<A, B> BlobGuard for Either<A, B>
368where
369 A: BlobGuard + Sync,
370 B: BlobGuard + Sync,
371{
372 type Reader = Either<A::Reader, B::Reader>;
373 async fn reader(&self) -> Result<Self::Reader> {
374 match self {
375 Either::L(a) => Ok(Either::L(a.reader().await?)),
376 Either::R(b) => Ok(Either::R(b.reader().await?)),
377 }
378 }
379}