puffin/
blob_metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::fmt;
16use std::collections::HashMap;
17
18use derive_builder::Builder;
19use serde::{Deserialize, Serialize};
20
21/// Blob metadata of Puffin
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Builder)]
23#[serde(rename_all = "kebab-case")]
24pub struct BlobMetadata {
25    /// Blob type
26    #[serde(rename = "type")]
27    pub blob_type: String,
28
29    /// For Iceberg, it' list of field IDs the blob was computed for;
30    /// the order of items is used to compute sketches stored in the blob.
31    ///
32    /// For usage outside the context of Iceberg, it can be ignored.
33    #[builder(default)]
34    #[serde(default)]
35    #[serde(rename = "fields")]
36    pub input_fields: Vec<i32>,
37
38    /// For Iceberg, it's ID of the Iceberg table’s snapshot the blob was computed from.
39    ///
40    /// For usage outside the context of Iceberg, it can be ignored.
41    #[builder(default)]
42    #[serde(default)]
43    pub snapshot_id: i64,
44
45    /// For Iceberg, it's sequence number of the Iceberg table’s snapshot the blob was computed from.
46    ///
47    /// For usage outside the context of Iceberg, it can be ignored.
48    #[builder(default)]
49    #[serde(default)]
50    pub sequence_number: i64,
51
52    /// The offset in the file where the blob contents start
53    pub offset: i64,
54
55    /// The length of the blob stored in the file (after compression, if compressed)
56    pub length: i64,
57
58    /// See [`CompressionCodec`]. If omitted, the data is assumed to be uncompressed.
59    #[builder(default)]
60    #[serde(default)]
61    #[serde(skip_serializing_if = "Option::is_none")]
62    pub compression_codec: Option<CompressionCodec>,
63
64    /// Storage for arbitrary meta-information about the blob
65    #[builder(default)]
66    #[serde(default)]
67    #[serde(skip_serializing_if = "HashMap::is_empty")]
68    pub properties: HashMap<String, String>,
69}
70
71impl BlobMetadata {
72    /// Calculates the memory usage of the blob metadata in bytes.
73    pub fn memory_usage(&self) -> usize {
74        self.blob_type.len()
75            + self.input_fields.len() * std::mem::size_of::<i32>()
76            + self
77                .properties
78                .iter()
79                .map(|(k, v)| k.len() + v.len())
80                .sum::<usize>()
81            + std::mem::size_of::<Self>()
82    }
83}
84
85/// Compression codec used to compress the blob
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
87#[serde(rename_all = "lowercase")]
88pub enum CompressionCodec {
89    /// Single [LZ4 compression frame](https://github.com/lz4/lz4/blob/77d1b93f72628af7bbde0243b4bba9205c3138d9/doc/lz4_Frame_format.md),
90    /// with content size present
91    Lz4,
92
93    /// Single [Zstandard compression frame](https://github.com/facebook/zstd/blob/8af64f41161f6c2e0ba842006fe238c664a6a437/doc/zstd_compression_format.md#zstandard-frames),
94    /// with content size present
95    Zstd,
96}
97
98impl fmt::Display for CompressionCodec {
99    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100        match self {
101            CompressionCodec::Lz4 => write!(f, "lz4"),
102            CompressionCodec::Zstd => write!(f, "zstd"),
103        }
104    }
105}
106
107#[cfg(test)]
108mod tests {
109    use super::*;
110
111    #[test]
112    fn test_blob_metadata_builder() {
113        let mut properties = HashMap::new();
114        properties.insert("property1".to_string(), "value1".to_string());
115        properties.insert("property2".to_string(), "value2".to_string());
116
117        let blob_metadata = BlobMetadataBuilder::default()
118            .blob_type("type1".to_string())
119            .input_fields(vec![1, 2, 3])
120            .snapshot_id(100)
121            .sequence_number(200)
122            .offset(300)
123            .length(400)
124            .compression_codec(Some(CompressionCodec::Lz4))
125            .properties(properties)
126            .build()
127            .unwrap();
128
129        assert_eq!("type1", blob_metadata.blob_type);
130        assert_eq!(vec![1, 2, 3], blob_metadata.input_fields);
131        assert_eq!(100, blob_metadata.snapshot_id);
132        assert_eq!(200, blob_metadata.sequence_number);
133        assert_eq!(300, blob_metadata.offset);
134        assert_eq!(400, blob_metadata.length);
135        assert_eq!(Some(CompressionCodec::Lz4), blob_metadata.compression_codec);
136        assert_eq!(
137            "value1",
138            blob_metadata.properties.get("property1").unwrap().as_str()
139        );
140        assert_eq!(
141            "value2",
142            blob_metadata.properties.get("property2").unwrap().as_str()
143        );
144    }
145
146    #[test]
147    fn test_blob_metadata_minimal_builder() {
148        let blob_metadata = BlobMetadataBuilder::default()
149            .blob_type("type1".to_string())
150            .offset(300)
151            .length(400)
152            .build()
153            .unwrap();
154
155        assert_eq!("type1", blob_metadata.blob_type);
156        assert_eq!(300, blob_metadata.offset);
157        assert_eq!(400, blob_metadata.length);
158        assert_eq!(None, blob_metadata.compression_codec);
159        assert_eq!(0, blob_metadata.properties.len());
160    }
161
162    #[test]
163    fn test_blob_metadata_missing_field() {
164        let blob_metadata = BlobMetadataBuilder::default()
165            .blob_type("type1".to_string())
166            .offset(300)
167            .build();
168        assert_eq!(
169            blob_metadata.unwrap_err().to_string(),
170            "`length` must be initialized"
171        );
172
173        let blob_metadata = BlobMetadataBuilder::default()
174            .blob_type("type1".to_string())
175            .length(400)
176            .build();
177        assert_eq!(
178            blob_metadata.unwrap_err().to_string(),
179            "`offset` must be initialized"
180        );
181
182        let blob_metadata = BlobMetadataBuilder::default()
183            .offset(300)
184            .length(400)
185            .build();
186        assert_eq!(
187            blob_metadata.unwrap_err().to_string(),
188            "`blob_type` must be initialized"
189        );
190    }
191
192    #[test]
193    fn test_serialize_deserialize_blob_metadata_with_properties() {
194        let mut properties = HashMap::new();
195        properties.insert(String::from("key1"), String::from("value1"));
196        properties.insert(String::from("key2"), String::from("value2"));
197
198        let metadata = BlobMetadata {
199            blob_type: String::from("test"),
200            input_fields: vec![1, 2, 3],
201            snapshot_id: 12345,
202            sequence_number: 67890,
203            offset: 100,
204            length: 200,
205            compression_codec: Some(CompressionCodec::Lz4),
206            properties: properties.clone(),
207        };
208
209        let json = serde_json::to_string(&metadata).unwrap();
210        let deserialized: BlobMetadata = serde_json::from_str(&json).unwrap();
211
212        assert_eq!(metadata, deserialized);
213        assert_eq!(properties, deserialized.properties);
214    }
215
216    #[test]
217    fn test_serialize_deserialize_blob_metadata_without_compression_codec() {
218        let metadata = BlobMetadata {
219            blob_type: String::from("test"),
220            input_fields: vec![1, 2, 3],
221            snapshot_id: 12345,
222            sequence_number: 67890,
223            offset: 100,
224            length: 200,
225            compression_codec: None,
226            properties: HashMap::new(),
227        };
228
229        let expected_json = r#"{"type":"test","fields":[1,2,3],"snapshot-id":12345,"sequence-number":67890,"offset":100,"length":200}"#;
230
231        let json = serde_json::to_string(&metadata).unwrap();
232        let deserialized: BlobMetadata = serde_json::from_str(&json).unwrap();
233
234        assert_eq!(expected_json, json);
235        assert_eq!(metadata, deserialized);
236    }
237
238    #[test]
239    fn test_deserialize_blob_metadata_with_properties() {
240        let json = r#"{
241            "type": "test",
242            "fields": [1, 2, 3],
243            "snapshot-id": 12345,
244            "sequence-number": 67890,
245            "offset": 100,
246            "length": 200,
247            "compression-codec": "lz4",
248            "properties": {
249                "key1": "value1",
250                "key2": "value2"
251            }
252        }"#;
253
254        let mut expected_properties = HashMap::new();
255        expected_properties.insert(String::from("key1"), String::from("value1"));
256        expected_properties.insert(String::from("key2"), String::from("value2"));
257
258        let expected = BlobMetadata {
259            blob_type: String::from("test"),
260            input_fields: vec![1, 2, 3],
261            snapshot_id: 12345,
262            sequence_number: 67890,
263            offset: 100,
264            length: 200,
265            compression_codec: Some(CompressionCodec::Lz4),
266            properties: expected_properties.clone(),
267        };
268
269        let deserialized: BlobMetadata = serde_json::from_str(json).unwrap();
270
271        assert_eq!(expected, deserialized);
272        assert_eq!(expected_properties, deserialized.properties);
273    }
274
275    #[test]
276    fn test_deserialize_blob_metadata_without_properties() {
277        let json = r#"{
278            "type": "test",
279            "fields": [1, 2, 3],
280            "snapshot-id": 12345,
281            "sequence-number": 67890,
282            "offset": 100,
283            "length": 200,
284            "compression-codec": "lz4"
285        }"#;
286
287        let expected = BlobMetadata {
288            blob_type: String::from("test"),
289            input_fields: vec![1, 2, 3],
290            snapshot_id: 12345,
291            sequence_number: 67890,
292            offset: 100,
293            length: 200,
294            compression_codec: Some(CompressionCodec::Lz4),
295            properties: HashMap::new(),
296        };
297
298        let deserialized: BlobMetadata = serde_json::from_str(json).unwrap();
299
300        assert_eq!(expected, deserialized);
301    }
302
303    #[test]
304    fn test_deserialize_blob_metadata_with_empty_properties() {
305        let json = r#"{
306            "type": "test",
307            "fields": [1, 2, 3],
308            "snapshot-id": 12345,
309            "sequence-number": 67890,
310            "offset": 100,
311            "length": 200,
312            "compression-codec": "lz4",
313            "properties": {}
314        }"#;
315
316        let expected_properties = HashMap::new();
317        let expected = BlobMetadata {
318            blob_type: String::from("test"),
319            input_fields: vec![1, 2, 3],
320            snapshot_id: 12345,
321            sequence_number: 67890,
322            offset: 100,
323            length: 200,
324            compression_codec: Some(CompressionCodec::Lz4),
325            properties: expected_properties.clone(),
326        };
327
328        let deserialized: BlobMetadata = serde_json::from_str(json).unwrap();
329
330        assert_eq!(expected, deserialized);
331        assert_eq!(expected_properties, deserialized.properties);
332    }
333
334    #[test]
335    fn test_deserialize_invalid_blob_metadata() {
336        let invalid_json = r#"{
337            "type": "test",
338            "input-fields": [1, 2, 3],
339            "snapshot-id": "12345",
340            "sequence-number": 67890,
341            "offset": 100,
342            "length": 200,
343            "compression-codec": "Invalid",
344            "properties": {}
345        }"#;
346
347        assert!(serde_json::from_str::<BlobMetadata>(invalid_json).is_err());
348    }
349}