puffin/puffin_manager/fs_puffin_manager/
writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::path::PathBuf;
17
18use async_compression::futures::bufread::ZstdEncoder;
19use async_trait::async_trait;
20use futures::io::BufReader;
21use futures::{AsyncRead, AsyncWrite, StreamExt};
22use snafu::{ensure, ResultExt};
23use tokio_util::compat::TokioAsyncReadCompatExt;
24use uuid::Uuid;
25
26use crate::blob_metadata::CompressionCodec;
27use crate::error::{
28    DuplicateBlobSnafu, MetadataSnafu, OpenSnafu, Result, SerializeJsonSnafu,
29    UnsupportedCompressionSnafu, WalkDirSnafu,
30};
31use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter};
32use crate::puffin_manager::fs_puffin_manager::dir_meta::{DirFileMetadata, DirMetadata};
33use crate::puffin_manager::stager::Stager;
34use crate::puffin_manager::{PuffinWriter, PutOptions};
35
36/// `FsPuffinWriter` is a `PuffinWriter` that writes blobs and directories to a puffin file.
37pub struct FsPuffinWriter<S: Stager, W> {
38    /// The name of the puffin file.
39    handle: S::FileHandle,
40
41    /// The stager.
42    stager: S,
43
44    /// The underlying `PuffinFileWriter`.
45    puffin_file_writer: PuffinFileWriter<W>,
46
47    /// Written blob keys.
48    blob_keys: HashSet<String>,
49}
50
51impl<S: Stager, W> FsPuffinWriter<S, W> {
52    pub(crate) fn new(handle: S::FileHandle, stager: S, writer: W) -> Self {
53        Self {
54            handle,
55            stager,
56            puffin_file_writer: PuffinFileWriter::new(writer),
57            blob_keys: HashSet::new(),
58        }
59    }
60}
61
62#[async_trait]
63impl<S, W> PuffinWriter for FsPuffinWriter<S, W>
64where
65    S: Stager,
66    W: AsyncWrite + Unpin + Send,
67{
68    async fn put_blob<R>(
69        &mut self,
70        key: &str,
71        raw_data: R,
72        options: PutOptions,
73        properties: HashMap<String, String>,
74    ) -> Result<u64>
75    where
76        R: AsyncRead + Send,
77    {
78        ensure!(
79            !self.blob_keys.contains(key),
80            DuplicateBlobSnafu { blob: key }
81        );
82
83        let written_bytes = self
84            .handle_compress(key.to_string(), raw_data, options.compression, properties)
85            .await?;
86
87        self.blob_keys.insert(key.to_string());
88        Ok(written_bytes)
89    }
90
91    async fn put_dir(
92        &mut self,
93        key: &str,
94        dir_path: PathBuf,
95        options: PutOptions,
96        properties: HashMap<String, String>,
97    ) -> Result<u64> {
98        ensure!(
99            !self.blob_keys.contains(key),
100            DuplicateBlobSnafu { blob: key }
101        );
102
103        // Walk the directory and add all files to the puffin file.
104        let mut wd = async_walkdir::WalkDir::new(&dir_path).filter(|entry| async move {
105            match entry.file_type().await {
106                // Ignore directories.
107                Ok(ft) if ft.is_dir() => async_walkdir::Filtering::Ignore,
108                _ => async_walkdir::Filtering::Continue,
109            }
110        });
111
112        let mut dir_size = 0;
113        let mut written_bytes = 0;
114        let mut files = vec![];
115        while let Some(entry) = wd.next().await {
116            let entry = entry.context(WalkDirSnafu)?;
117            dir_size += entry.metadata().await.context(MetadataSnafu)?.len();
118
119            let reader = tokio::fs::File::open(entry.path())
120                .await
121                .context(OpenSnafu)?
122                .compat();
123
124            let file_key = Uuid::new_v4().to_string();
125            written_bytes += self
126                .handle_compress(
127                    file_key.clone(),
128                    reader,
129                    options.compression,
130                    Default::default(),
131                )
132                .await?;
133
134            let path = entry.path();
135            let relative_path = path
136                .strip_prefix(&dir_path)
137                .expect("entry path is under dir path");
138
139            let unified_rel_path = if cfg!(windows) {
140                relative_path.to_string_lossy().replace('\\', "/")
141            } else {
142                relative_path.to_string_lossy().to_string()
143            };
144
145            files.push(DirFileMetadata {
146                relative_path: unified_rel_path,
147                key: file_key.clone(),
148                blob_index: self.blob_keys.len(),
149            });
150            self.blob_keys.insert(file_key);
151        }
152
153        let dir_metadata = DirMetadata { files };
154        let encoded = serde_json::to_vec(&dir_metadata).context(SerializeJsonSnafu)?;
155        let dir_meta_blob = Blob {
156            blob_type: key.to_string(),
157            compressed_data: encoded.as_slice(),
158            compression_codec: None,
159            properties,
160        };
161
162        written_bytes += self.puffin_file_writer.add_blob(dir_meta_blob).await?;
163        self.blob_keys.insert(key.to_string());
164
165        // Move the directory into the stager.
166        self.stager
167            .put_dir(&self.handle, key, dir_path, dir_size)
168            .await?;
169        Ok(written_bytes)
170    }
171
172    fn set_footer_lz4_compressed(&mut self, lz4_compressed: bool) {
173        self.puffin_file_writer
174            .set_footer_lz4_compressed(lz4_compressed);
175    }
176
177    async fn finish(mut self) -> Result<u64> {
178        let size = self.puffin_file_writer.finish().await?;
179        Ok(size)
180    }
181}
182
183impl<S, W> FsPuffinWriter<S, W>
184where
185    S: Stager,
186    W: AsyncWrite + Unpin + Send,
187{
188    /// Compresses the raw data and writes it to the puffin file.
189    async fn handle_compress(
190        &mut self,
191        key: String,
192        raw_data: impl AsyncRead + Send,
193        compression: Option<CompressionCodec>,
194        properties: HashMap<String, String>,
195    ) -> Result<u64> {
196        match compression {
197            Some(CompressionCodec::Lz4) => UnsupportedCompressionSnafu { codec: "lz4" }.fail(),
198            Some(CompressionCodec::Zstd) => {
199                let blob = Blob {
200                    blob_type: key,
201                    compressed_data: ZstdEncoder::new(BufReader::new(raw_data)),
202                    compression_codec: compression,
203                    properties,
204                };
205                self.puffin_file_writer.add_blob(blob).await
206            }
207            None => {
208                let blob = Blob {
209                    blob_type: key,
210                    compressed_data: raw_data,
211                    compression_codec: compression,
212                    properties,
213                };
214                self.puffin_file_writer.add_blob(blob).await
215            }
216        }
217    }
218}