puffin/puffin_manager/fs_puffin_manager/
writer.rs1use std::collections::{HashMap, HashSet};
16use std::path::PathBuf;
17
18use async_compression::futures::bufread::ZstdEncoder;
19use async_trait::async_trait;
20use futures::io::BufReader;
21use futures::{AsyncRead, AsyncWrite, StreamExt};
22use snafu::{ensure, ResultExt};
23use tokio_util::compat::TokioAsyncReadCompatExt;
24use uuid::Uuid;
25
26use crate::blob_metadata::CompressionCodec;
27use crate::error::{
28 DuplicateBlobSnafu, MetadataSnafu, OpenSnafu, Result, SerializeJsonSnafu,
29 UnsupportedCompressionSnafu, WalkDirSnafu,
30};
31use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter};
32use crate::puffin_manager::fs_puffin_manager::dir_meta::{DirFileMetadata, DirMetadata};
33use crate::puffin_manager::stager::Stager;
34use crate::puffin_manager::{PuffinWriter, PutOptions};
35
36pub struct FsPuffinWriter<S: Stager, W> {
38 handle: S::FileHandle,
40
41 stager: S,
43
44 puffin_file_writer: PuffinFileWriter<W>,
46
47 blob_keys: HashSet<String>,
49}
50
51impl<S: Stager, W> FsPuffinWriter<S, W> {
52 pub(crate) fn new(handle: S::FileHandle, stager: S, writer: W) -> Self {
53 Self {
54 handle,
55 stager,
56 puffin_file_writer: PuffinFileWriter::new(writer),
57 blob_keys: HashSet::new(),
58 }
59 }
60}
61
62#[async_trait]
63impl<S, W> PuffinWriter for FsPuffinWriter<S, W>
64where
65 S: Stager,
66 W: AsyncWrite + Unpin + Send,
67{
68 async fn put_blob<R>(
69 &mut self,
70 key: &str,
71 raw_data: R,
72 options: PutOptions,
73 properties: HashMap<String, String>,
74 ) -> Result<u64>
75 where
76 R: AsyncRead + Send,
77 {
78 ensure!(
79 !self.blob_keys.contains(key),
80 DuplicateBlobSnafu { blob: key }
81 );
82
83 let written_bytes = self
84 .handle_compress(key.to_string(), raw_data, options.compression, properties)
85 .await?;
86
87 self.blob_keys.insert(key.to_string());
88 Ok(written_bytes)
89 }
90
91 async fn put_dir(
92 &mut self,
93 key: &str,
94 dir_path: PathBuf,
95 options: PutOptions,
96 properties: HashMap<String, String>,
97 ) -> Result<u64> {
98 ensure!(
99 !self.blob_keys.contains(key),
100 DuplicateBlobSnafu { blob: key }
101 );
102
103 let mut wd = async_walkdir::WalkDir::new(&dir_path).filter(|entry| async move {
105 match entry.file_type().await {
106 Ok(ft) if ft.is_dir() => async_walkdir::Filtering::Ignore,
108 _ => async_walkdir::Filtering::Continue,
109 }
110 });
111
112 let mut dir_size = 0;
113 let mut written_bytes = 0;
114 let mut files = vec![];
115 while let Some(entry) = wd.next().await {
116 let entry = entry.context(WalkDirSnafu)?;
117 dir_size += entry.metadata().await.context(MetadataSnafu)?.len();
118
119 let reader = tokio::fs::File::open(entry.path())
120 .await
121 .context(OpenSnafu)?
122 .compat();
123
124 let file_key = Uuid::new_v4().to_string();
125 written_bytes += self
126 .handle_compress(
127 file_key.clone(),
128 reader,
129 options.compression,
130 Default::default(),
131 )
132 .await?;
133
134 let path = entry.path();
135 let relative_path = path
136 .strip_prefix(&dir_path)
137 .expect("entry path is under dir path");
138
139 let unified_rel_path = if cfg!(windows) {
140 relative_path.to_string_lossy().replace('\\', "/")
141 } else {
142 relative_path.to_string_lossy().to_string()
143 };
144
145 files.push(DirFileMetadata {
146 relative_path: unified_rel_path,
147 key: file_key.clone(),
148 blob_index: self.blob_keys.len(),
149 });
150 self.blob_keys.insert(file_key);
151 }
152
153 let dir_metadata = DirMetadata { files };
154 let encoded = serde_json::to_vec(&dir_metadata).context(SerializeJsonSnafu)?;
155 let dir_meta_blob = Blob {
156 blob_type: key.to_string(),
157 compressed_data: encoded.as_slice(),
158 compression_codec: None,
159 properties,
160 };
161
162 written_bytes += self.puffin_file_writer.add_blob(dir_meta_blob).await?;
163 self.blob_keys.insert(key.to_string());
164
165 self.stager
167 .put_dir(&self.handle, key, dir_path, dir_size)
168 .await?;
169 Ok(written_bytes)
170 }
171
172 fn set_footer_lz4_compressed(&mut self, lz4_compressed: bool) {
173 self.puffin_file_writer
174 .set_footer_lz4_compressed(lz4_compressed);
175 }
176
177 async fn finish(mut self) -> Result<u64> {
178 let size = self.puffin_file_writer.finish().await?;
179 Ok(size)
180 }
181}
182
183impl<S, W> FsPuffinWriter<S, W>
184where
185 S: Stager,
186 W: AsyncWrite + Unpin + Send,
187{
188 async fn handle_compress(
190 &mut self,
191 key: String,
192 raw_data: impl AsyncRead + Send,
193 compression: Option<CompressionCodec>,
194 properties: HashMap<String, String>,
195 ) -> Result<u64> {
196 match compression {
197 Some(CompressionCodec::Lz4) => UnsupportedCompressionSnafu { codec: "lz4" }.fail(),
198 Some(CompressionCodec::Zstd) => {
199 let blob = Blob {
200 blob_type: key,
201 compressed_data: ZstdEncoder::new(BufReader::new(raw_data)),
202 compression_codec: compression,
203 properties,
204 };
205 self.puffin_file_writer.add_blob(blob).await
206 }
207 None => {
208 let blob = Blob {
209 blob_type: key,
210 compressed_data: raw_data,
211 compression_codec: compression,
212 properties,
213 };
214 self.puffin_file_writer.add_blob(blob).await
215 }
216 }
217 }
218}