mito2/sst/
version.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SST version.
16use std::collections::HashMap;
17use std::fmt;
18use std::sync::Arc;
19
20use common_time::{TimeToLive, Timestamp};
21
22use crate::sst::file::{FileHandle, FileId, FileMeta, Level, MAX_LEVEL};
23use crate::sst::file_purger::FilePurgerRef;
24
25/// A version of all SSTs in a region.
26#[derive(Debug, Clone)]
27pub(crate) struct SstVersion {
28    /// SST metadata organized by levels.
29    levels: LevelMetaArray,
30}
31
32pub(crate) type SstVersionRef = Arc<SstVersion>;
33
34impl SstVersion {
35    /// Returns a new [SstVersion].
36    pub(crate) fn new() -> SstVersion {
37        SstVersion {
38            levels: new_level_meta_vec(),
39        }
40    }
41
42    /// Returns a slice to metadatas of all levels.
43    pub(crate) fn levels(&self) -> &[LevelMeta] {
44        &self.levels
45    }
46
47    /// Add files to the version.
48    ///
49    /// # Panics
50    /// Panics if level of [FileMeta] is greater than [MAX_LEVEL].
51    pub(crate) fn add_files(
52        &mut self,
53        file_purger: FilePurgerRef,
54        files_to_add: impl Iterator<Item = FileMeta>,
55    ) {
56        for file in files_to_add {
57            let level = file.level;
58            self.levels[level as usize]
59                .files
60                .entry(file.file_id)
61                .or_insert_with(|| FileHandle::new(file, file_purger.clone()));
62        }
63    }
64
65    /// Remove files from the version.
66    ///
67    /// # Panics
68    /// Panics if level of [FileMeta] is greater than [MAX_LEVEL].
69    pub(crate) fn remove_files(&mut self, files_to_remove: impl Iterator<Item = FileMeta>) {
70        for file in files_to_remove {
71            let level = file.level;
72            if let Some(handle) = self.levels[level as usize].files.remove(&file.file_id) {
73                handle.mark_deleted();
74            }
75        }
76    }
77
78    /// Marks all SSTs in this version as deleted.
79    pub(crate) fn mark_all_deleted(&self) {
80        for level_meta in &self.levels {
81            for file_handle in level_meta.files.values() {
82                file_handle.mark_deleted();
83            }
84        }
85    }
86
87    /// Returns the number of rows in SST files.
88    /// For historical reasons, the result is not precise for old SST files.
89    pub(crate) fn num_rows(&self) -> u64 {
90        self.levels
91            .iter()
92            .map(|level_meta| {
93                level_meta
94                    .files
95                    .values()
96                    .map(|file_handle| {
97                        let meta = file_handle.meta_ref();
98                        meta.num_rows
99                    })
100                    .sum::<u64>()
101            })
102            .sum()
103    }
104
105    /// Returns the number of SST files.
106    pub(crate) fn num_files(&self) -> u64 {
107        self.levels
108            .iter()
109            .map(|level_meta| level_meta.files.len() as u64)
110            .sum()
111    }
112
113    /// Returns SST data files'space occupied in current version.
114    pub(crate) fn sst_usage(&self) -> u64 {
115        self.levels
116            .iter()
117            .map(|level_meta| {
118                level_meta
119                    .files
120                    .values()
121                    .map(|file_handle| {
122                        let meta = file_handle.meta_ref();
123                        meta.file_size
124                    })
125                    .sum::<u64>()
126            })
127            .sum()
128    }
129
130    /// Returns SST index files'space occupied in current version.
131    pub(crate) fn index_usage(&self) -> u64 {
132        self.levels
133            .iter()
134            .map(|level_meta| {
135                level_meta
136                    .files
137                    .values()
138                    .map(|file_handle| {
139                        let meta = file_handle.meta_ref();
140                        meta.index_file_size
141                    })
142                    .sum::<u64>()
143            })
144            .sum()
145    }
146}
147
148// We only has fixed number of level, so we use array to hold elements. This implementation
149// detail of LevelMetaArray should not be exposed to users of [LevelMetas].
150type LevelMetaArray = [LevelMeta; MAX_LEVEL as usize];
151
152/// Metadata of files in the same SST level.
153#[derive(Clone)]
154pub struct LevelMeta {
155    /// Level number.
156    pub level: Level,
157    /// Handles of SSTs in this level.
158    pub files: HashMap<FileId, FileHandle>,
159}
160
161impl LevelMeta {
162    /// Returns an empty meta of specific `level`.
163    pub(crate) fn new(level: Level) -> LevelMeta {
164        LevelMeta {
165            level,
166            files: HashMap::new(),
167        }
168    }
169
170    /// Returns expired SSTs from current level.
171    pub fn get_expired_files(&self, now: &Timestamp, ttl: &TimeToLive) -> Vec<FileHandle> {
172        self.files
173            .values()
174            .filter(|v| {
175                let (_, end) = v.time_range();
176
177                match ttl.is_expired(&end, now) {
178                    Ok(expired) => expired,
179                    Err(e) => {
180                        common_telemetry::error!(e; "Failed to calculate region TTL expire time");
181                        false
182                    }
183                }
184            })
185            .cloned()
186            .collect()
187    }
188
189    pub fn files(&self) -> impl Iterator<Item = &FileHandle> {
190        self.files.values()
191    }
192}
193
194impl fmt::Debug for LevelMeta {
195    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196        f.debug_struct("LevelMeta")
197            .field("level", &self.level)
198            .field("files", &self.files.keys())
199            .finish()
200    }
201}
202
203fn new_level_meta_vec() -> LevelMetaArray {
204    (0u8..MAX_LEVEL)
205        .map(LevelMeta::new)
206        .collect::<Vec<_>>()
207        .try_into()
208        .unwrap() // safety: LevelMetaArray is a fixed length array with length MAX_LEVEL
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214    use crate::test_util::new_noop_file_purger;
215
216    #[test]
217    fn test_add_files() {
218        let purger = new_noop_file_purger();
219
220        let files = (1..=3)
221            .map(|_| FileMeta {
222                file_id: FileId::random(),
223                ..Default::default()
224            })
225            .collect::<Vec<_>>();
226
227        let mut version = SstVersion::new();
228        // files[1] is added multiple times, and that's ok.
229        version.add_files(purger.clone(), files[..=1].iter().cloned());
230        version.add_files(purger, files[1..].iter().cloned());
231
232        let added_files = &version.levels()[0].files;
233        assert_eq!(added_files.len(), 3);
234        files.iter().for_each(|f| {
235            assert!(added_files.contains_key(&f.file_id));
236        });
237    }
238}