mito2/sst/
version.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SST version.
16use std::collections::HashMap;
17use std::fmt;
18use std::sync::Arc;
19
20use common_time::{TimeToLive, Timestamp};
21use store_api::storage::FileId;
22
23use crate::sst::file::{FileHandle, FileMeta, Level, MAX_LEVEL};
24use crate::sst::file_purger::FilePurgerRef;
25
26/// A version of all SSTs in a region.
27#[derive(Debug, Clone)]
28pub(crate) struct SstVersion {
29    /// SST metadata organized by levels.
30    levels: LevelMetaArray,
31}
32
33pub(crate) type SstVersionRef = Arc<SstVersion>;
34
35impl SstVersion {
36    /// Returns a new [SstVersion].
37    pub(crate) fn new() -> SstVersion {
38        SstVersion {
39            levels: new_level_meta_vec(),
40        }
41    }
42
43    /// Returns a slice to metadatas of all levels.
44    pub(crate) fn levels(&self) -> &[LevelMeta] {
45        &self.levels
46    }
47
48    /// Add files to the version.
49    ///
50    /// # Panics
51    /// Panics if level of [FileMeta] is greater than [MAX_LEVEL].
52    pub(crate) fn add_files(
53        &mut self,
54        file_purger: FilePurgerRef,
55        files_to_add: impl Iterator<Item = FileMeta>,
56    ) {
57        for file in files_to_add {
58            let level = file.level;
59            self.levels[level as usize]
60                .files
61                .entry(file.file_id)
62                .or_insert_with(|| FileHandle::new(file, file_purger.clone()));
63        }
64    }
65
66    /// Remove files from the version.
67    ///
68    /// # Panics
69    /// Panics if level of [FileMeta] is greater than [MAX_LEVEL].
70    pub(crate) fn remove_files(&mut self, files_to_remove: impl Iterator<Item = FileMeta>) {
71        for file in files_to_remove {
72            let level = file.level;
73            if let Some(handle) = self.levels[level as usize].files.remove(&file.file_id) {
74                handle.mark_deleted();
75            }
76        }
77    }
78
79    /// Marks all SSTs in this version as deleted.
80    pub(crate) fn mark_all_deleted(&self) {
81        for level_meta in &self.levels {
82            for file_handle in level_meta.files.values() {
83                file_handle.mark_deleted();
84            }
85        }
86    }
87
88    /// Returns the number of rows in SST files.
89    /// For historical reasons, the result is not precise for old SST files.
90    pub(crate) fn num_rows(&self) -> u64 {
91        self.levels
92            .iter()
93            .map(|level_meta| {
94                level_meta
95                    .files
96                    .values()
97                    .map(|file_handle| {
98                        let meta = file_handle.meta_ref();
99                        meta.num_rows
100                    })
101                    .sum::<u64>()
102            })
103            .sum()
104    }
105
106    /// Returns the number of SST files.
107    pub(crate) fn num_files(&self) -> u64 {
108        self.levels
109            .iter()
110            .map(|level_meta| level_meta.files.len() as u64)
111            .sum()
112    }
113
114    /// Returns SST data files'space occupied in current version.
115    pub(crate) fn sst_usage(&self) -> u64 {
116        self.levels
117            .iter()
118            .map(|level_meta| {
119                level_meta
120                    .files
121                    .values()
122                    .map(|file_handle| {
123                        let meta = file_handle.meta_ref();
124                        meta.file_size
125                    })
126                    .sum::<u64>()
127            })
128            .sum()
129    }
130
131    /// Returns SST index files'space occupied in current version.
132    pub(crate) fn index_usage(&self) -> u64 {
133        self.levels
134            .iter()
135            .map(|level_meta| {
136                level_meta
137                    .files
138                    .values()
139                    .map(|file_handle| {
140                        let meta = file_handle.meta_ref();
141                        meta.index_file_size
142                    })
143                    .sum::<u64>()
144            })
145            .sum()
146    }
147}
148
149// We only has fixed number of level, so we use array to hold elements. This implementation
150// detail of LevelMetaArray should not be exposed to users of [LevelMetas].
151type LevelMetaArray = [LevelMeta; MAX_LEVEL as usize];
152
153/// Metadata of files in the same SST level.
154#[derive(Clone)]
155pub struct LevelMeta {
156    /// Level number.
157    pub level: Level,
158    /// Handles of SSTs in this level.
159    pub files: HashMap<FileId, FileHandle>,
160}
161
162impl LevelMeta {
163    /// Returns an empty meta of specific `level`.
164    pub(crate) fn new(level: Level) -> LevelMeta {
165        LevelMeta {
166            level,
167            files: HashMap::new(),
168        }
169    }
170
171    /// Returns expired SSTs from current level.
172    pub fn get_expired_files(&self, now: &Timestamp, ttl: &TimeToLive) -> Vec<FileHandle> {
173        self.files
174            .values()
175            .filter(|v| {
176                let (_, end) = v.time_range();
177
178                match ttl.is_expired(&end, now) {
179                    Ok(expired) => expired,
180                    Err(e) => {
181                        common_telemetry::error!(e; "Failed to calculate region TTL expire time");
182                        false
183                    }
184                }
185            })
186            .cloned()
187            .collect()
188    }
189
190    pub fn files(&self) -> impl Iterator<Item = &FileHandle> {
191        self.files.values()
192    }
193}
194
195impl fmt::Debug for LevelMeta {
196    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197        f.debug_struct("LevelMeta")
198            .field("level", &self.level)
199            .field("files", &self.files.keys())
200            .finish()
201    }
202}
203
204fn new_level_meta_vec() -> LevelMetaArray {
205    (0u8..MAX_LEVEL)
206        .map(LevelMeta::new)
207        .collect::<Vec<_>>()
208        .try_into()
209        .unwrap() // safety: LevelMetaArray is a fixed length array with length MAX_LEVEL
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use crate::test_util::new_noop_file_purger;
216
217    #[test]
218    fn test_add_files() {
219        let purger = new_noop_file_purger();
220
221        let files = (1..=3)
222            .map(|_| FileMeta {
223                file_id: FileId::random(),
224                ..Default::default()
225            })
226            .collect::<Vec<_>>();
227
228        let mut version = SstVersion::new();
229        // files[1] is added multiple times, and that's ok.
230        version.add_files(purger.clone(), files[..=1].iter().cloned());
231        version.add_files(purger, files[1..].iter().cloned());
232
233        let added_files = &version.levels()[0].files;
234        assert_eq!(added_files.len(), 3);
235        files.iter().for_each(|f| {
236            assert!(added_files.contains_key(&f.file_id));
237        });
238    }
239}