mito2/sst/
file.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structures to describe metadata of files.
16
17use std::fmt;
18use std::num::NonZeroU64;
19use std::str::FromStr;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::Arc;
22
23use common_time::Timestamp;
24use serde::{Deserialize, Serialize};
25use smallvec::SmallVec;
26use snafu::{ResultExt, Snafu};
27use store_api::storage::RegionId;
28use uuid::Uuid;
29
30use crate::sst::file_purger::{FilePurgerRef, PurgeRequest};
31use crate::sst::location;
32
33/// Type to store SST level.
34pub type Level = u8;
35/// Maximum level of SSTs.
36pub const MAX_LEVEL: Level = 2;
37
38#[derive(Debug, Snafu, PartialEq)]
39pub struct ParseIdError {
40    source: uuid::Error,
41}
42
43/// Unique id for [SST File].
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
45pub struct FileId(Uuid);
46
47impl FileId {
48    /// Returns a new unique [FileId] randomly.
49    pub fn random() -> FileId {
50        FileId(Uuid::new_v4())
51    }
52
53    /// Parses id from string.
54    pub fn parse_str(input: &str) -> std::result::Result<FileId, ParseIdError> {
55        Uuid::parse_str(input).map(FileId).context(ParseIdSnafu)
56    }
57
58    /// Append `.parquet` to file id to make a complete file name
59    pub fn as_parquet(&self) -> String {
60        format!("{}{}", self, ".parquet")
61    }
62
63    /// Append `.puffin` to file id to make a complete file name
64    pub fn as_puffin(&self) -> String {
65        format!("{}{}", self, ".puffin")
66    }
67
68    /// Converts [FileId] as byte slice.
69    pub fn as_bytes(&self) -> &[u8] {
70        self.0.as_bytes()
71    }
72}
73
74impl From<FileId> for Uuid {
75    fn from(value: FileId) -> Self {
76        value.0
77    }
78}
79
80impl fmt::Display for FileId {
81    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82        write!(f, "{}", self.0)
83    }
84}
85
86impl FromStr for FileId {
87    type Err = ParseIdError;
88
89    fn from_str(s: &str) -> std::result::Result<FileId, ParseIdError> {
90        FileId::parse_str(s)
91    }
92}
93
94/// Time range (min and max timestamps) of a SST file.
95/// Both min and max are inclusive.
96pub type FileTimeRange = (Timestamp, Timestamp);
97
98/// Checks if two inclusive timestamp ranges overlap with each other.
99pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
100    let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) };
101    let (_, l_end) = l;
102    let (r_start, _) = r;
103
104    r_start <= l_end
105}
106
107/// Metadata of a SST file.
108#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
109#[serde(default)]
110pub struct FileMeta {
111    /// Region of file.
112    pub region_id: RegionId,
113    /// Compared to normal file names, FileId ignore the extension
114    pub file_id: FileId,
115    /// Timestamp range of file. The timestamps have the same time unit as the
116    /// data in the SST.
117    pub time_range: FileTimeRange,
118    /// SST level of the file.
119    pub level: Level,
120    /// Size of the file.
121    pub file_size: u64,
122    /// Available indexes of the file.
123    pub available_indexes: SmallVec<[IndexType; 4]>,
124    /// Size of the index file.
125    pub index_file_size: u64,
126    /// Number of rows in the file.
127    ///
128    /// For historical reasons, this field might be missing in old files. Thus
129    /// the default value `0` doesn't means the file doesn't contains any rows,
130    /// but instead means the number of rows is unknown.
131    pub num_rows: u64,
132    /// Number of row groups in the file.
133    ///
134    /// For historical reasons, this field might be missing in old files. Thus
135    /// the default value `0` doesn't means the file doesn't contains any rows,
136    /// but instead means the number of rows is unknown.
137    pub num_row_groups: u64,
138    /// Sequence in this file.
139    ///
140    /// This sequence is the only sequence in this file. And it's retrieved from the max
141    /// sequence of the rows on generating this file.
142    pub sequence: Option<NonZeroU64>,
143}
144
145/// Type of index.
146#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
147pub enum IndexType {
148    /// Inverted index.
149    InvertedIndex,
150    /// Full-text index.
151    FulltextIndex,
152    /// Bloom Filter index
153    BloomFilterIndex,
154}
155
156impl FileMeta {
157    pub fn exists_index(&self) -> bool {
158        !self.available_indexes.is_empty()
159    }
160
161    /// Returns true if the file has an inverted index
162    pub fn inverted_index_available(&self) -> bool {
163        self.available_indexes.contains(&IndexType::InvertedIndex)
164    }
165
166    /// Returns true if the file has a fulltext index
167    pub fn fulltext_index_available(&self) -> bool {
168        self.available_indexes.contains(&IndexType::FulltextIndex)
169    }
170
171    /// Returns true if the file has a bloom filter index.
172    pub fn bloom_filter_index_available(&self) -> bool {
173        self.available_indexes
174            .contains(&IndexType::BloomFilterIndex)
175    }
176
177    pub fn index_file_size(&self) -> u64 {
178        self.index_file_size
179    }
180}
181
182/// Handle to a SST file.
183#[derive(Clone)]
184pub struct FileHandle {
185    inner: Arc<FileHandleInner>,
186}
187
188impl fmt::Debug for FileHandle {
189    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
190        f.debug_struct("FileHandle")
191            .field("region_id", &self.inner.meta.region_id)
192            .field("file_id", &self.inner.meta.file_id)
193            .field("time_range", &self.inner.meta.time_range)
194            .field("size", &self.inner.meta.file_size)
195            .field("level", &self.inner.meta.level)
196            .field("compacting", &self.inner.compacting)
197            .field("deleted", &self.inner.deleted)
198            .finish()
199    }
200}
201
202impl FileHandle {
203    pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle {
204        FileHandle {
205            inner: Arc::new(FileHandleInner::new(meta, file_purger)),
206        }
207    }
208
209    /// Returns the region id of the file.
210    pub fn region_id(&self) -> RegionId {
211        self.inner.meta.region_id
212    }
213
214    /// Returns the file id.
215    pub fn file_id(&self) -> FileId {
216        self.inner.meta.file_id
217    }
218
219    /// Returns the complete file path of the file.
220    pub fn file_path(&self, file_dir: &str) -> String {
221        location::sst_file_path(file_dir, self.file_id())
222    }
223
224    /// Returns the time range of the file.
225    pub fn time_range(&self) -> FileTimeRange {
226        self.inner.meta.time_range
227    }
228
229    /// Mark the file as deleted and will delete it on drop asynchronously
230    pub fn mark_deleted(&self) {
231        self.inner.deleted.store(true, Ordering::Relaxed);
232    }
233
234    pub fn compacting(&self) -> bool {
235        self.inner.compacting.load(Ordering::Relaxed)
236    }
237
238    pub fn set_compacting(&self, compacting: bool) {
239        self.inner.compacting.store(compacting, Ordering::Relaxed);
240    }
241
242    /// Returns a reference to the [FileMeta].
243    pub fn meta_ref(&self) -> &FileMeta {
244        &self.inner.meta
245    }
246
247    pub fn size(&self) -> u64 {
248        self.inner.meta.file_size
249    }
250
251    pub fn index_size(&self) -> u64 {
252        self.inner.meta.index_file_size
253    }
254
255    pub fn num_rows(&self) -> usize {
256        self.inner.meta.num_rows as usize
257    }
258}
259
260/// Inner data of [FileHandle].
261///
262/// Contains meta of the file, and other mutable info like whether the file is compacting.
263struct FileHandleInner {
264    meta: FileMeta,
265    compacting: AtomicBool,
266    deleted: AtomicBool,
267    file_purger: FilePurgerRef,
268}
269
270impl Drop for FileHandleInner {
271    fn drop(&mut self) {
272        if self.deleted.load(Ordering::Relaxed) {
273            self.file_purger.send_request(PurgeRequest {
274                file_meta: self.meta.clone(),
275            });
276        }
277    }
278}
279
280impl FileHandleInner {
281    fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner {
282        FileHandleInner {
283            meta,
284            compacting: AtomicBool::new(false),
285            deleted: AtomicBool::new(false),
286            file_purger,
287        }
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294
295    #[test]
296    fn test_file_id() {
297        let id = FileId::random();
298        let uuid_str = id.to_string();
299        assert_eq!(id.0.to_string(), uuid_str);
300
301        let parsed = FileId::parse_str(&uuid_str).unwrap();
302        assert_eq!(id, parsed);
303        let parsed = uuid_str.parse().unwrap();
304        assert_eq!(id, parsed);
305    }
306
307    #[test]
308    fn test_file_id_serialization() {
309        let id = FileId::random();
310        let json = serde_json::to_string(&id).unwrap();
311        assert_eq!(format!("\"{id}\""), json);
312
313        let parsed = serde_json::from_str(&json).unwrap();
314        assert_eq!(id, parsed);
315    }
316
317    #[test]
318    fn test_file_id_as_parquet() {
319        let id = FileId::from_str("67e55044-10b1-426f-9247-bb680e5fe0c8").unwrap();
320        assert_eq!(
321            "67e55044-10b1-426f-9247-bb680e5fe0c8.parquet",
322            id.as_parquet()
323        );
324    }
325
326    fn create_file_meta(file_id: FileId, level: Level) -> FileMeta {
327        FileMeta {
328            region_id: 0.into(),
329            file_id,
330            time_range: FileTimeRange::default(),
331            level,
332            file_size: 0,
333            available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
334            index_file_size: 0,
335            num_rows: 0,
336            num_row_groups: 0,
337            sequence: None,
338        }
339    }
340
341    #[test]
342    fn test_deserialize_file_meta() {
343        let file_meta = create_file_meta(FileId::random(), 0);
344        let serialized_file_meta = serde_json::to_string(&file_meta).unwrap();
345        let deserialized_file_meta = serde_json::from_str(&serialized_file_meta);
346        assert_eq!(file_meta, deserialized_file_meta.unwrap());
347    }
348
349    #[test]
350    fn test_deserialize_from_string() {
351        let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\
352        \"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\
353        \"available_indexes\":[\"InvertedIndex\"],\"level\":0}";
354        let file_meta = create_file_meta(
355            FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
356            0,
357        );
358        let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
359        assert_eq!(file_meta, deserialized_file_meta);
360    }
361}