puffin/file_format/writer/
file.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::{io, mem};
17
18use async_trait::async_trait;
19use futures::{AsyncRead, AsyncWrite, AsyncWriteExt};
20use snafu::ResultExt;
21
22use crate::blob_metadata::{BlobMetadata, BlobMetadataBuilder, CompressionCodec};
23use crate::error::{CloseSnafu, FlushSnafu, Result, WriteSnafu};
24use crate::file_format::writer::footer::FooterWriter;
25use crate::file_format::writer::{AsyncWriter, Blob, SyncWriter};
26use crate::file_format::MAGIC;
27
28/// Puffin file writer, implements both [`PuffinSyncWriter`] and [`PuffinAsyncWriter`]
29pub struct PuffinFileWriter<W> {
30    /// The writer to write to.
31    writer: W,
32
33    /// The properties of the file.
34    properties: HashMap<String, String>,
35
36    /// The metadata of the blobs.
37    blob_metadata: Vec<BlobMetadata>,
38
39    /// The number of bytes written.
40    written_bytes: u64,
41
42    /// Whether the footer payload should be LZ4 compressed.
43    footer_lz4_compressed: bool,
44}
45
46impl<W> PuffinFileWriter<W> {
47    pub fn new(writer: W) -> Self {
48        Self {
49            writer,
50            properties: HashMap::new(),
51            blob_metadata: Vec::new(),
52            written_bytes: 0,
53            footer_lz4_compressed: false,
54        }
55    }
56
57    fn create_blob_metadata(
58        &self,
59        typ: String,
60        compression_codec: Option<CompressionCodec>,
61        properties: HashMap<String, String>,
62        size: u64,
63    ) -> BlobMetadata {
64        BlobMetadataBuilder::default()
65            .blob_type(typ)
66            .properties(properties)
67            .compression_codec(compression_codec)
68            .offset(self.written_bytes as _)
69            .length(size as _)
70            .build()
71            .expect("Required fields are not set")
72    }
73}
74
75impl<W: io::Write> SyncWriter for PuffinFileWriter<W> {
76    fn set_properties(&mut self, properties: HashMap<String, String>) {
77        self.properties = properties;
78    }
79
80    fn add_blob<R: io::Read>(&mut self, mut blob: Blob<R>) -> Result<u64> {
81        self.write_header_if_needed_sync()?;
82
83        let size = io::copy(&mut blob.compressed_data, &mut self.writer).context(WriteSnafu)?;
84
85        let blob_metadata = self.create_blob_metadata(
86            blob.blob_type,
87            blob.compression_codec,
88            blob.properties,
89            size,
90        );
91        self.blob_metadata.push(blob_metadata);
92
93        self.written_bytes += size;
94        Ok(size)
95    }
96
97    fn set_footer_lz4_compressed(&mut self, lz4_compressed: bool) {
98        self.footer_lz4_compressed = lz4_compressed;
99    }
100
101    fn finish(&mut self) -> Result<u64> {
102        self.write_header_if_needed_sync()?;
103        self.write_footer_sync()?;
104        self.writer.flush().context(FlushSnafu)?;
105
106        Ok(self.written_bytes)
107    }
108}
109
110#[async_trait]
111impl<W: AsyncWrite + Unpin + Send> AsyncWriter for PuffinFileWriter<W> {
112    fn set_properties(&mut self, properties: HashMap<String, String>) {
113        self.properties = properties;
114    }
115
116    async fn add_blob<R: AsyncRead + Send>(&mut self, blob: Blob<R>) -> Result<u64> {
117        self.write_header_if_needed_async().await?;
118
119        let size = futures::io::copy(blob.compressed_data, &mut self.writer)
120            .await
121            .context(WriteSnafu)?;
122
123        let blob_metadata = self.create_blob_metadata(
124            blob.blob_type,
125            blob.compression_codec,
126            blob.properties,
127            size,
128        );
129        self.blob_metadata.push(blob_metadata);
130
131        self.written_bytes += size;
132        Ok(size)
133    }
134
135    fn set_footer_lz4_compressed(&mut self, lz4_compressed: bool) {
136        self.footer_lz4_compressed = lz4_compressed;
137    }
138
139    async fn finish(&mut self) -> Result<u64> {
140        self.write_header_if_needed_async().await?;
141        self.write_footer_async().await?;
142        self.writer.flush().await.context(FlushSnafu)?;
143        self.writer.close().await.context(CloseSnafu)?;
144
145        Ok(self.written_bytes)
146    }
147}
148
149impl<W: io::Write> PuffinFileWriter<W> {
150    fn write_header_if_needed_sync(&mut self) -> Result<()> {
151        if self.written_bytes == 0 {
152            self.writer.write_all(&MAGIC).context(WriteSnafu)?;
153            self.written_bytes += MAGIC.len() as u64;
154        }
155        Ok(())
156    }
157
158    fn write_footer_sync(&mut self) -> Result<()> {
159        let bytes = FooterWriter::new(
160            mem::take(&mut self.blob_metadata),
161            mem::take(&mut self.properties),
162            self.footer_lz4_compressed,
163        )
164        .into_footer_bytes()?;
165
166        self.writer.write_all(&bytes).context(WriteSnafu)?;
167        self.written_bytes += bytes.len() as u64;
168        Ok(())
169    }
170}
171
172impl<W: AsyncWrite + Unpin> PuffinFileWriter<W> {
173    async fn write_header_if_needed_async(&mut self) -> Result<()> {
174        if self.written_bytes == 0 {
175            self.writer.write_all(&MAGIC).await.context(WriteSnafu)?;
176            self.written_bytes += MAGIC.len() as u64;
177        }
178        Ok(())
179    }
180
181    async fn write_footer_async(&mut self) -> Result<()> {
182        let bytes = FooterWriter::new(
183            mem::take(&mut self.blob_metadata),
184            mem::take(&mut self.properties),
185            self.footer_lz4_compressed,
186        )
187        .into_footer_bytes()?;
188
189        self.writer.write_all(&bytes).await.context(WriteSnafu)?;
190        self.written_bytes += bytes.len() as u64;
191        Ok(())
192    }
193}