puffin/file_format/writer/
file.rs1use std::collections::HashMap;
16use std::{io, mem};
17
18use async_trait::async_trait;
19use futures::{AsyncRead, AsyncWrite, AsyncWriteExt};
20use snafu::ResultExt;
21
22use crate::blob_metadata::{BlobMetadata, BlobMetadataBuilder, CompressionCodec};
23use crate::error::{CloseSnafu, FlushSnafu, Result, WriteSnafu};
24use crate::file_format::writer::footer::FooterWriter;
25use crate::file_format::writer::{AsyncWriter, Blob, SyncWriter};
26use crate::file_format::MAGIC;
27
28pub struct PuffinFileWriter<W> {
30 writer: W,
32
33 properties: HashMap<String, String>,
35
36 blob_metadata: Vec<BlobMetadata>,
38
39 written_bytes: u64,
41
42 footer_lz4_compressed: bool,
44}
45
46impl<W> PuffinFileWriter<W> {
47 pub fn new(writer: W) -> Self {
48 Self {
49 writer,
50 properties: HashMap::new(),
51 blob_metadata: Vec::new(),
52 written_bytes: 0,
53 footer_lz4_compressed: false,
54 }
55 }
56
57 fn create_blob_metadata(
58 &self,
59 typ: String,
60 compression_codec: Option<CompressionCodec>,
61 properties: HashMap<String, String>,
62 size: u64,
63 ) -> BlobMetadata {
64 BlobMetadataBuilder::default()
65 .blob_type(typ)
66 .properties(properties)
67 .compression_codec(compression_codec)
68 .offset(self.written_bytes as _)
69 .length(size as _)
70 .build()
71 .expect("Required fields are not set")
72 }
73}
74
75impl<W: io::Write> SyncWriter for PuffinFileWriter<W> {
76 fn set_properties(&mut self, properties: HashMap<String, String>) {
77 self.properties = properties;
78 }
79
80 fn add_blob<R: io::Read>(&mut self, mut blob: Blob<R>) -> Result<u64> {
81 self.write_header_if_needed_sync()?;
82
83 let size = io::copy(&mut blob.compressed_data, &mut self.writer).context(WriteSnafu)?;
84
85 let blob_metadata = self.create_blob_metadata(
86 blob.blob_type,
87 blob.compression_codec,
88 blob.properties,
89 size,
90 );
91 self.blob_metadata.push(blob_metadata);
92
93 self.written_bytes += size;
94 Ok(size)
95 }
96
97 fn set_footer_lz4_compressed(&mut self, lz4_compressed: bool) {
98 self.footer_lz4_compressed = lz4_compressed;
99 }
100
101 fn finish(&mut self) -> Result<u64> {
102 self.write_header_if_needed_sync()?;
103 self.write_footer_sync()?;
104 self.writer.flush().context(FlushSnafu)?;
105
106 Ok(self.written_bytes)
107 }
108}
109
110#[async_trait]
111impl<W: AsyncWrite + Unpin + Send> AsyncWriter for PuffinFileWriter<W> {
112 fn set_properties(&mut self, properties: HashMap<String, String>) {
113 self.properties = properties;
114 }
115
116 async fn add_blob<R: AsyncRead + Send>(&mut self, blob: Blob<R>) -> Result<u64> {
117 self.write_header_if_needed_async().await?;
118
119 let size = futures::io::copy(blob.compressed_data, &mut self.writer)
120 .await
121 .context(WriteSnafu)?;
122
123 let blob_metadata = self.create_blob_metadata(
124 blob.blob_type,
125 blob.compression_codec,
126 blob.properties,
127 size,
128 );
129 self.blob_metadata.push(blob_metadata);
130
131 self.written_bytes += size;
132 Ok(size)
133 }
134
135 fn set_footer_lz4_compressed(&mut self, lz4_compressed: bool) {
136 self.footer_lz4_compressed = lz4_compressed;
137 }
138
139 async fn finish(&mut self) -> Result<u64> {
140 self.write_header_if_needed_async().await?;
141 self.write_footer_async().await?;
142 self.writer.flush().await.context(FlushSnafu)?;
143 self.writer.close().await.context(CloseSnafu)?;
144
145 Ok(self.written_bytes)
146 }
147}
148
149impl<W: io::Write> PuffinFileWriter<W> {
150 fn write_header_if_needed_sync(&mut self) -> Result<()> {
151 if self.written_bytes == 0 {
152 self.writer.write_all(&MAGIC).context(WriteSnafu)?;
153 self.written_bytes += MAGIC.len() as u64;
154 }
155 Ok(())
156 }
157
158 fn write_footer_sync(&mut self) -> Result<()> {
159 let bytes = FooterWriter::new(
160 mem::take(&mut self.blob_metadata),
161 mem::take(&mut self.properties),
162 self.footer_lz4_compressed,
163 )
164 .into_footer_bytes()?;
165
166 self.writer.write_all(&bytes).context(WriteSnafu)?;
167 self.written_bytes += bytes.len() as u64;
168 Ok(())
169 }
170}
171
172impl<W: AsyncWrite + Unpin> PuffinFileWriter<W> {
173 async fn write_header_if_needed_async(&mut self) -> Result<()> {
174 if self.written_bytes == 0 {
175 self.writer.write_all(&MAGIC).await.context(WriteSnafu)?;
176 self.written_bytes += MAGIC.len() as u64;
177 }
178 Ok(())
179 }
180
181 async fn write_footer_async(&mut self) -> Result<()> {
182 let bytes = FooterWriter::new(
183 mem::take(&mut self.blob_metadata),
184 mem::take(&mut self.properties),
185 self.footer_lz4_compressed,
186 )
187 .into_footer_bytes()?;
188
189 self.writer.write_all(&bytes).await.context(WriteSnafu)?;
190 self.written_bytes += bytes.len() as u64;
191 Ok(())
192 }
193}