puffin/puffin_manager/fs_puffin_manager/
reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io;
16use std::ops::Range;
17use std::sync::Arc;
18
19use async_compression::futures::bufread::ZstdDecoder;
20use async_trait::async_trait;
21use bytes::{BufMut, Bytes};
22use common_base::range_read::{AsyncReadAdapter, Metadata, RangeReader, SizeAwareRangeReader};
23use futures::io::BufReader;
24use futures::{AsyncRead, AsyncWrite};
25use snafu::{ensure, OptionExt, ResultExt};
26
27use crate::blob_metadata::{BlobMetadata, CompressionCodec};
28use crate::error::{
29    BlobIndexOutOfBoundSnafu, BlobNotFoundSnafu, DeserializeJsonSnafu, FileKeyNotMatchSnafu,
30    MetadataSnafu, ReadSnafu, Result, UnsupportedDecompressionSnafu, WriteSnafu,
31};
32use crate::file_format::reader::{AsyncReader, PuffinFileReader};
33use crate::file_metadata::FileMetadata;
34use crate::partial_reader::PartialReader;
35use crate::puffin_manager::file_accessor::PuffinFileAccessor;
36use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata;
37use crate::puffin_manager::fs_puffin_manager::PuffinMetadataCacheRef;
38use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager};
39use crate::puffin_manager::{BlobGuard, GuardWithMetadata, PuffinReader};
40
41/// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files.
42pub struct FsPuffinReader<S, F>
43where
44    S: Stager + 'static,
45    F: PuffinFileAccessor + Clone,
46{
47    /// The handle of the puffin file.
48    handle: F::FileHandle,
49
50    /// The file size hint.
51    file_size_hint: Option<u64>,
52
53    /// The stager.
54    stager: S,
55
56    /// The puffin file accessor.
57    puffin_file_accessor: F,
58
59    /// The puffin file metadata cache.
60    puffin_file_metadata_cache: Option<PuffinMetadataCacheRef>,
61}
62
63impl<S, F> FsPuffinReader<S, F>
64where
65    S: Stager + 'static,
66    F: PuffinFileAccessor + Clone,
67{
68    pub(crate) fn new(
69        handle: F::FileHandle,
70        stager: S,
71        puffin_file_accessor: F,
72        puffin_file_metadata_cache: Option<PuffinMetadataCacheRef>,
73    ) -> Self {
74        Self {
75            handle,
76            file_size_hint: None,
77            stager,
78            puffin_file_accessor,
79            puffin_file_metadata_cache,
80        }
81    }
82}
83
84#[async_trait]
85impl<S, F> PuffinReader for FsPuffinReader<S, F>
86where
87    F: PuffinFileAccessor + Clone,
88    S: Stager<FileHandle = F::FileHandle> + 'static,
89{
90    type Blob = Either<RandomReadBlob<F>, S::Blob>;
91    type Dir = S::Dir;
92
93    fn with_file_size_hint(mut self, file_size_hint: Option<u64>) -> Self {
94        self.file_size_hint = file_size_hint;
95        self
96    }
97
98    async fn metadata(&self) -> Result<Arc<FileMetadata>> {
99        let mut file = self.puffin_reader().await?;
100        self.get_puffin_file_metadata(&mut file).await
101    }
102
103    async fn blob(&self, key: &str) -> Result<GuardWithMetadata<Self::Blob>> {
104        let mut file = self.puffin_reader().await?;
105        let blob_metadata = self.get_blob_metadata(key, &mut file).await?;
106        let blob = if blob_metadata.compression_codec.is_none() {
107            // If the blob is not compressed, we can directly read it from the puffin file.
108            Either::L(RandomReadBlob {
109                handle: self.handle.clone(),
110                accessor: self.puffin_file_accessor.clone(),
111                blob_metadata: blob_metadata.clone(),
112            })
113        } else {
114            // If the blob is compressed, we need to decompress it into staging space before reading.
115            let blob_metadata = blob_metadata.clone();
116            let staged_blob = self
117                .stager
118                .get_blob(
119                    &self.handle,
120                    key,
121                    Box::new(|writer| {
122                        Box::pin(Self::init_blob_to_stager(file, blob_metadata, writer))
123                    }),
124                )
125                .await?;
126
127            Either::R(staged_blob)
128        };
129
130        Ok(GuardWithMetadata::new(blob, blob_metadata))
131    }
132
133    async fn dir(&self, key: &str) -> Result<GuardWithMetadata<Self::Dir>> {
134        let mut file = self.puffin_reader().await?;
135        let blob_metadata = self.get_blob_metadata(key, &mut file).await?;
136        let dir = self
137            .stager
138            .get_dir(
139                &self.handle,
140                key,
141                Box::new(|writer_provider| {
142                    let accessor = self.puffin_file_accessor.clone();
143                    let handle = self.handle.clone();
144                    let blob_metadata = blob_metadata.clone();
145                    Box::pin(Self::init_dir_to_stager(
146                        file,
147                        blob_metadata,
148                        handle,
149                        writer_provider,
150                        accessor,
151                    ))
152                }),
153            )
154            .await?;
155
156        Ok(GuardWithMetadata::new(dir, blob_metadata))
157    }
158}
159
160impl<S, F> FsPuffinReader<S, F>
161where
162    S: Stager,
163    F: PuffinFileAccessor + Clone,
164{
165    async fn get_puffin_file_metadata(
166        &self,
167        reader: &mut PuffinFileReader<F::Reader>,
168    ) -> Result<Arc<FileMetadata>> {
169        let id = self.handle.to_string();
170        if let Some(cache) = self.puffin_file_metadata_cache.as_ref() {
171            if let Some(metadata) = cache.get_metadata(&id) {
172                return Ok(metadata);
173            }
174        }
175
176        let metadata = Arc::new(reader.metadata().await?);
177        if let Some(cache) = self.puffin_file_metadata_cache.as_ref() {
178            cache.put_metadata(id, metadata.clone());
179        }
180        Ok(metadata)
181    }
182
183    async fn get_blob_metadata(
184        &self,
185        key: &str,
186        file: &mut PuffinFileReader<F::Reader>,
187    ) -> Result<BlobMetadata> {
188        let metadata = self.get_puffin_file_metadata(file).await?;
189        let blob_metadata = metadata
190            .blobs
191            .iter()
192            .find(|m| m.blob_type == key)
193            .context(BlobNotFoundSnafu { blob: key })?
194            .clone();
195
196        Ok(blob_metadata)
197    }
198
199    async fn puffin_reader(&self) -> Result<PuffinFileReader<F::Reader>> {
200        let mut reader = self.puffin_file_accessor.reader(&self.handle).await?;
201        if let Some(file_size_hint) = self.file_size_hint {
202            reader.with_file_size_hint(file_size_hint);
203        }
204        Ok(PuffinFileReader::new(reader))
205    }
206
207    async fn init_blob_to_stager(
208        reader: PuffinFileReader<F::Reader>,
209        blob_metadata: BlobMetadata,
210        mut writer: BoxWriter,
211    ) -> Result<u64> {
212        let reader = reader.into_blob_reader(&blob_metadata);
213        let reader = AsyncReadAdapter::new(reader).await.context(MetadataSnafu)?;
214        let compression = blob_metadata.compression_codec;
215        let size = Self::handle_decompress(reader, &mut writer, compression).await?;
216        Ok(size)
217    }
218
219    async fn init_dir_to_stager(
220        mut file: PuffinFileReader<F::Reader>,
221        blob_metadata: BlobMetadata,
222        handle: F::FileHandle,
223        writer_provider: DirWriterProviderRef,
224        accessor: F,
225    ) -> Result<u64> {
226        let puffin_metadata = file.metadata().await?;
227        let reader = file.blob_reader(&blob_metadata)?;
228        let meta = reader.metadata().await.context(MetadataSnafu)?;
229        let buf = reader
230            .read(0..meta.content_length)
231            .await
232            .context(ReadSnafu)?;
233        let dir_meta: DirMetadata = serde_json::from_slice(&buf).context(DeserializeJsonSnafu)?;
234
235        let mut tasks = vec![];
236        for file_meta in dir_meta.files {
237            let blob_meta = puffin_metadata
238                .blobs
239                .get(file_meta.blob_index)
240                .context(BlobIndexOutOfBoundSnafu {
241                    index: file_meta.blob_index,
242                    max_index: puffin_metadata.blobs.len(),
243                })?
244                .clone();
245            ensure!(
246                blob_meta.blob_type == file_meta.key,
247                FileKeyNotMatchSnafu {
248                    expected: file_meta.key,
249                    actual: &blob_meta.blob_type,
250                }
251            );
252
253            let reader = accessor.reader(&handle).await?;
254            let writer = writer_provider.writer(&file_meta.relative_path).await?;
255            let task = common_runtime::spawn_global(async move {
256                let reader = PuffinFileReader::new(reader).into_blob_reader(&blob_meta);
257                let reader = AsyncReadAdapter::new(reader).await.context(MetadataSnafu)?;
258                let compression = blob_meta.compression_codec;
259                let size = Self::handle_decompress(reader, writer, compression).await?;
260                Ok(size)
261            });
262            tasks.push(task);
263        }
264
265        let size = futures::future::try_join_all(tasks.into_iter())
266            .await
267            .into_iter()
268            .flatten()
269            .sum::<Result<_>>()?;
270
271        Ok(size)
272    }
273
274    /// Handles the decompression of the reader and writes the decompressed data to the writer.
275    /// Returns the number of bytes written.
276    async fn handle_decompress(
277        reader: impl AsyncRead,
278        mut writer: impl AsyncWrite + Unpin,
279        compression: Option<CompressionCodec>,
280    ) -> Result<u64> {
281        match compression {
282            Some(CompressionCodec::Lz4) => UnsupportedDecompressionSnafu {
283                decompression: "lz4",
284            }
285            .fail(),
286            Some(CompressionCodec::Zstd) => {
287                let reader = ZstdDecoder::new(BufReader::new(reader));
288                futures::io::copy(reader, &mut writer)
289                    .await
290                    .context(WriteSnafu)
291            }
292            None => futures::io::copy(reader, &mut writer)
293                .await
294                .context(WriteSnafu),
295        }
296    }
297}
298
299/// `RandomReadBlob` is a `BlobGuard` that directly reads the blob from the puffin file.
300pub struct RandomReadBlob<F: PuffinFileAccessor> {
301    handle: F::FileHandle,
302    accessor: F,
303    blob_metadata: BlobMetadata,
304}
305
306#[async_trait]
307impl<F: PuffinFileAccessor + Clone> BlobGuard for RandomReadBlob<F> {
308    type Reader = PartialReader<F::Reader>;
309
310    async fn reader(&self) -> Result<Self::Reader> {
311        ensure!(
312            self.blob_metadata.compression_codec.is_none(),
313            UnsupportedDecompressionSnafu {
314                decompression: self.blob_metadata.compression_codec.unwrap().to_string()
315            }
316        );
317
318        let reader = self.accessor.reader(&self.handle).await?;
319        let blob_reader = PuffinFileReader::new(reader).into_blob_reader(&self.blob_metadata);
320        Ok(blob_reader)
321    }
322}
323
324/// `Either` is a type that represents either `A` or `B`.
325///
326/// Used to:
327/// impl `RangeReader` for `Either<A: RangeReader, B: RangeReader>`,
328/// impl `BlobGuard` for `Either<A: BlobGuard, B: BlobGuard>`.
329pub enum Either<A, B> {
330    L(A),
331    R(B),
332}
333
334#[async_trait]
335impl<A, B> RangeReader for Either<A, B>
336where
337    A: RangeReader,
338    B: RangeReader,
339{
340    async fn metadata(&self) -> io::Result<Metadata> {
341        match self {
342            Either::L(a) => a.metadata().await,
343            Either::R(b) => b.metadata().await,
344        }
345    }
346    async fn read(&self, range: Range<u64>) -> io::Result<Bytes> {
347        match self {
348            Either::L(a) => a.read(range).await,
349            Either::R(b) => b.read(range).await,
350        }
351    }
352    async fn read_into(&self, range: Range<u64>, buf: &mut (impl BufMut + Send)) -> io::Result<()> {
353        match self {
354            Either::L(a) => a.read_into(range, buf).await,
355            Either::R(b) => b.read_into(range, buf).await,
356        }
357    }
358    async fn read_vec(&self, ranges: &[Range<u64>]) -> io::Result<Vec<Bytes>> {
359        match self {
360            Either::L(a) => a.read_vec(ranges).await,
361            Either::R(b) => b.read_vec(ranges).await,
362        }
363    }
364}
365
366#[async_trait]
367impl<A, B> BlobGuard for Either<A, B>
368where
369    A: BlobGuard + Sync,
370    B: BlobGuard + Sync,
371{
372    type Reader = Either<A::Reader, B::Reader>;
373    async fn reader(&self) -> Result<Self::Reader> {
374        match self {
375            Either::L(a) => Ok(Either::L(a.reader().await?)),
376            Either::R(b) => Ok(Either::R(b.reader().await?)),
377        }
378    }
379}