mito2/sst/
file.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structures to describe metadata of files.
16
17use std::fmt;
18use std::fmt::{Debug, Formatter};
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::Arc;
23
24use common_base::readable_size::ReadableSize;
25use common_time::Timestamp;
26use serde::{Deserialize, Serialize};
27use smallvec::SmallVec;
28use snafu::{ResultExt, Snafu};
29use store_api::storage::RegionId;
30use uuid::Uuid;
31
32use crate::sst::file_purger::{FilePurgerRef, PurgeRequest};
33use crate::sst::location;
34
35/// Type to store SST level.
36pub type Level = u8;
37/// Maximum level of SSTs.
38pub const MAX_LEVEL: Level = 2;
39
40#[derive(Debug, Snafu, PartialEq)]
41pub struct ParseIdError {
42    source: uuid::Error,
43}
44
45/// Unique id for [SST File].
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
47pub struct FileId(Uuid);
48
49impl FileId {
50    /// Returns a new unique [FileId] randomly.
51    pub fn random() -> FileId {
52        FileId(Uuid::new_v4())
53    }
54
55    /// Parses id from string.
56    pub fn parse_str(input: &str) -> std::result::Result<FileId, ParseIdError> {
57        Uuid::parse_str(input).map(FileId).context(ParseIdSnafu)
58    }
59
60    /// Append `.parquet` to file id to make a complete file name
61    pub fn as_parquet(&self) -> String {
62        format!("{}{}", self, ".parquet")
63    }
64
65    /// Append `.puffin` to file id to make a complete file name
66    pub fn as_puffin(&self) -> String {
67        format!("{}{}", self, ".puffin")
68    }
69
70    /// Converts [FileId] as byte slice.
71    pub fn as_bytes(&self) -> &[u8] {
72        self.0.as_bytes()
73    }
74}
75
76impl From<FileId> for Uuid {
77    fn from(value: FileId) -> Self {
78        value.0
79    }
80}
81
82impl fmt::Display for FileId {
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        write!(f, "{}", self.0)
85    }
86}
87
88impl FromStr for FileId {
89    type Err = ParseIdError;
90
91    fn from_str(s: &str) -> std::result::Result<FileId, ParseIdError> {
92        FileId::parse_str(s)
93    }
94}
95
96/// Time range (min and max timestamps) of a SST file.
97/// Both min and max are inclusive.
98pub type FileTimeRange = (Timestamp, Timestamp);
99
100/// Checks if two inclusive timestamp ranges overlap with each other.
101pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
102    let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) };
103    let (_, l_end) = l;
104    let (r_start, _) = r;
105
106    r_start <= l_end
107}
108
109/// Metadata of a SST file.
110#[derive(Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
111#[serde(default)]
112pub struct FileMeta {
113    /// Region of file.
114    pub region_id: RegionId,
115    /// Compared to normal file names, FileId ignore the extension
116    pub file_id: FileId,
117    /// Timestamp range of file. The timestamps have the same time unit as the
118    /// data in the SST.
119    pub time_range: FileTimeRange,
120    /// SST level of the file.
121    pub level: Level,
122    /// Size of the file.
123    pub file_size: u64,
124    /// Available indexes of the file.
125    pub available_indexes: SmallVec<[IndexType; 4]>,
126    /// Size of the index file.
127    pub index_file_size: u64,
128    /// Number of rows in the file.
129    ///
130    /// For historical reasons, this field might be missing in old files. Thus
131    /// the default value `0` doesn't means the file doesn't contains any rows,
132    /// but instead means the number of rows is unknown.
133    pub num_rows: u64,
134    /// Number of row groups in the file.
135    ///
136    /// For historical reasons, this field might be missing in old files. Thus
137    /// the default value `0` doesn't means the file doesn't contains any rows,
138    /// but instead means the number of rows is unknown.
139    pub num_row_groups: u64,
140    /// Sequence in this file.
141    ///
142    /// This sequence is the only sequence in this file. And it's retrieved from the max
143    /// sequence of the rows on generating this file.
144    pub sequence: Option<NonZeroU64>,
145}
146
147impl Debug for FileMeta {
148    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
149        let mut debug_struct = f.debug_struct("FileMeta");
150        debug_struct
151            .field("region_id", &self.region_id)
152            .field_with("file_id", |f| write!(f, "{} ", self.file_id))
153            .field_with("time_range", |f| {
154                write!(
155                    f,
156                    "({}, {}) ",
157                    self.time_range.0.to_iso8601_string(),
158                    self.time_range.1.to_iso8601_string()
159                )
160            })
161            .field("level", &self.level)
162            .field("file_size", &ReadableSize(self.file_size));
163        if !self.available_indexes.is_empty() {
164            debug_struct
165                .field("available_indexes", &self.available_indexes)
166                .field("index_file_size", &ReadableSize(self.index_file_size));
167        }
168        debug_struct
169            .field("num_rows", &self.num_rows)
170            .field("num_row_groups", &self.num_row_groups)
171            .field_with("sequence", |f| match self.sequence {
172                None => {
173                    write!(f, "None")
174                }
175                Some(seq) => {
176                    write!(f, "{}", seq)
177                }
178            })
179            .finish()
180    }
181}
182
183/// Type of index.
184#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
185pub enum IndexType {
186    /// Inverted index.
187    InvertedIndex,
188    /// Full-text index.
189    FulltextIndex,
190    /// Bloom Filter index
191    BloomFilterIndex,
192}
193
194impl FileMeta {
195    pub fn exists_index(&self) -> bool {
196        !self.available_indexes.is_empty()
197    }
198
199    /// Returns true if the file has an inverted index
200    pub fn inverted_index_available(&self) -> bool {
201        self.available_indexes.contains(&IndexType::InvertedIndex)
202    }
203
204    /// Returns true if the file has a fulltext index
205    pub fn fulltext_index_available(&self) -> bool {
206        self.available_indexes.contains(&IndexType::FulltextIndex)
207    }
208
209    /// Returns true if the file has a bloom filter index.
210    pub fn bloom_filter_index_available(&self) -> bool {
211        self.available_indexes
212            .contains(&IndexType::BloomFilterIndex)
213    }
214
215    pub fn index_file_size(&self) -> u64 {
216        self.index_file_size
217    }
218}
219
220/// Handle to a SST file.
221#[derive(Clone)]
222pub struct FileHandle {
223    inner: Arc<FileHandleInner>,
224}
225
226impl fmt::Debug for FileHandle {
227    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228        f.debug_struct("FileHandle")
229            .field("meta", self.meta_ref())
230            .field("compacting", &self.compacting())
231            .field("deleted", &self.inner.deleted.load(Ordering::Relaxed))
232            .finish()
233    }
234}
235
236impl FileHandle {
237    pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle {
238        FileHandle {
239            inner: Arc::new(FileHandleInner::new(meta, file_purger)),
240        }
241    }
242
243    /// Returns the region id of the file.
244    pub fn region_id(&self) -> RegionId {
245        self.inner.meta.region_id
246    }
247
248    /// Returns the file id.
249    pub fn file_id(&self) -> FileId {
250        self.inner.meta.file_id
251    }
252
253    /// Returns the complete file path of the file.
254    pub fn file_path(&self, file_dir: &str) -> String {
255        location::sst_file_path(file_dir, self.file_id())
256    }
257
258    /// Returns the time range of the file.
259    pub fn time_range(&self) -> FileTimeRange {
260        self.inner.meta.time_range
261    }
262
263    /// Mark the file as deleted and will delete it on drop asynchronously
264    pub fn mark_deleted(&self) {
265        self.inner.deleted.store(true, Ordering::Relaxed);
266    }
267
268    pub fn compacting(&self) -> bool {
269        self.inner.compacting.load(Ordering::Relaxed)
270    }
271
272    pub fn set_compacting(&self, compacting: bool) {
273        self.inner.compacting.store(compacting, Ordering::Relaxed);
274    }
275
276    /// Returns a reference to the [FileMeta].
277    pub fn meta_ref(&self) -> &FileMeta {
278        &self.inner.meta
279    }
280
281    pub fn size(&self) -> u64 {
282        self.inner.meta.file_size
283    }
284
285    pub fn index_size(&self) -> u64 {
286        self.inner.meta.index_file_size
287    }
288
289    pub fn num_rows(&self) -> usize {
290        self.inner.meta.num_rows as usize
291    }
292}
293
294/// Inner data of [FileHandle].
295///
296/// Contains meta of the file, and other mutable info like whether the file is compacting.
297struct FileHandleInner {
298    meta: FileMeta,
299    compacting: AtomicBool,
300    deleted: AtomicBool,
301    file_purger: FilePurgerRef,
302}
303
304impl Drop for FileHandleInner {
305    fn drop(&mut self) {
306        if self.deleted.load(Ordering::Relaxed) {
307            self.file_purger.send_request(PurgeRequest {
308                file_meta: self.meta.clone(),
309            });
310        }
311    }
312}
313
314impl FileHandleInner {
315    fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner {
316        FileHandleInner {
317            meta,
318            compacting: AtomicBool::new(false),
319            deleted: AtomicBool::new(false),
320            file_purger,
321        }
322    }
323}
324
325#[cfg(test)]
326mod tests {
327    use super::*;
328
329    #[test]
330    fn test_file_id() {
331        let id = FileId::random();
332        let uuid_str = id.to_string();
333        assert_eq!(id.0.to_string(), uuid_str);
334
335        let parsed = FileId::parse_str(&uuid_str).unwrap();
336        assert_eq!(id, parsed);
337        let parsed = uuid_str.parse().unwrap();
338        assert_eq!(id, parsed);
339    }
340
341    #[test]
342    fn test_file_id_serialization() {
343        let id = FileId::random();
344        let json = serde_json::to_string(&id).unwrap();
345        assert_eq!(format!("\"{id}\""), json);
346
347        let parsed = serde_json::from_str(&json).unwrap();
348        assert_eq!(id, parsed);
349    }
350
351    #[test]
352    fn test_file_id_as_parquet() {
353        let id = FileId::from_str("67e55044-10b1-426f-9247-bb680e5fe0c8").unwrap();
354        assert_eq!(
355            "67e55044-10b1-426f-9247-bb680e5fe0c8.parquet",
356            id.as_parquet()
357        );
358    }
359
360    fn create_file_meta(file_id: FileId, level: Level) -> FileMeta {
361        FileMeta {
362            region_id: 0.into(),
363            file_id,
364            time_range: FileTimeRange::default(),
365            level,
366            file_size: 0,
367            available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
368            index_file_size: 0,
369            num_rows: 0,
370            num_row_groups: 0,
371            sequence: None,
372        }
373    }
374
375    #[test]
376    fn test_deserialize_file_meta() {
377        let file_meta = create_file_meta(FileId::random(), 0);
378        let serialized_file_meta = serde_json::to_string(&file_meta).unwrap();
379        let deserialized_file_meta = serde_json::from_str(&serialized_file_meta);
380        assert_eq!(file_meta, deserialized_file_meta.unwrap());
381    }
382
383    #[test]
384    fn test_deserialize_from_string() {
385        let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\
386        \"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\
387        \"available_indexes\":[\"InvertedIndex\"],\"level\":0}";
388        let file_meta = create_file_meta(
389            FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
390            0,
391        );
392        let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
393        assert_eq!(file_meta, deserialized_file_meta);
394    }
395}