Skip to main content

mito2/sst/
version.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SST version.
16use std::collections::HashMap;
17use std::fmt;
18use std::sync::Arc;
19
20use common_time::{TimeToLive, Timestamp};
21use store_api::storage::FileId;
22
23use crate::sst::file::{FileHandle, FileMeta, Level, MAX_LEVEL};
24use crate::sst::file_purger::FilePurgerRef;
25
26/// A version of all SSTs in a region.
27#[derive(Debug, Clone)]
28pub(crate) struct SstVersion {
29    /// SST metadata organized by levels.
30    levels: LevelMetaArray,
31}
32
33pub(crate) type SstVersionRef = Arc<SstVersion>;
34
35impl SstVersion {
36    /// Returns a new [SstVersion].
37    pub(crate) fn new() -> SstVersion {
38        SstVersion {
39            levels: new_level_meta_vec(),
40        }
41    }
42
43    /// Returns a slice to metadatas of all levels.
44    pub(crate) fn levels(&self) -> &[LevelMeta] {
45        &self.levels
46    }
47
48    /// Add files to the version. If a file with the same `file_id` already exists,
49    /// it will be overwritten with the new file.
50    ///
51    /// # Panics
52    /// Panics if level of [FileMeta] is greater than [MAX_LEVEL].
53    pub(crate) fn add_files(
54        &mut self,
55        file_purger: FilePurgerRef,
56        files_to_add: impl Iterator<Item = FileMeta>,
57    ) {
58        for file in files_to_add {
59            let level = file.level;
60            let new_index_version = file.index_version;
61            // If the file already exists, then we should only replace the handle when the index is outdated.
62            self.levels[level as usize]
63                .files
64                .entry(file.file_id)
65                .and_modify(|f| {
66                    if *f.meta_ref() == file || f.meta_ref().is_index_up_to_date(&file) {
67                        // same file meta or current file handle's index is up-to-date, skip adding
68                        if f.index_id().version > new_index_version {
69                            // what does it mean for us to see older index version?
70                            common_telemetry::warn!(
71                                "Adding file with older index version, existing: {:?}, new: {:?}, ignoring new file",
72                                f.meta_ref(),
73                                file
74                            );
75                        }
76                    } else {
77                        // include case like old file have no index or index is outdated
78                        *f = FileHandle::new(file.clone(), file_purger.clone());
79                    }
80                })
81                .or_insert_with(|| {
82                    FileHandle::new(file.clone(), file_purger.clone())
83                });
84        }
85    }
86
87    /// Remove files from the version.
88    ///
89    /// # Panics
90    /// Panics if level of [FileMeta] is greater than [MAX_LEVEL].
91    pub(crate) fn remove_files(&mut self, files_to_remove: impl Iterator<Item = FileMeta>) {
92        for file in files_to_remove {
93            let level = file.level;
94            if let Some(handle) = self.levels[level as usize].files.remove(&file.file_id) {
95                handle.mark_deleted();
96            }
97        }
98    }
99
100    /// Marks all SSTs in this version as deleted.
101    pub(crate) fn mark_all_deleted(&self) {
102        for level_meta in &self.levels {
103            for file_handle in level_meta.files.values() {
104                file_handle.mark_deleted();
105            }
106        }
107    }
108
109    /// Returns the number of rows in SST files.
110    /// For historical reasons, the result is not precise for old SST files.
111    pub(crate) fn num_rows(&self) -> u64 {
112        self.levels
113            .iter()
114            .map(|level_meta| {
115                level_meta
116                    .files
117                    .values()
118                    .map(|file_handle| {
119                        let meta = file_handle.meta_ref();
120                        meta.num_rows
121                    })
122                    .sum::<u64>()
123            })
124            .sum()
125    }
126
127    /// Returns the number of SST files.
128    pub(crate) fn num_files(&self) -> u64 {
129        self.levels
130            .iter()
131            .map(|level_meta| level_meta.files.len() as u64)
132            .sum()
133    }
134
135    /// Returns SST data files'space occupied in current version.
136    pub(crate) fn sst_usage(&self) -> u64 {
137        self.levels
138            .iter()
139            .map(|level_meta| {
140                level_meta
141                    .files
142                    .values()
143                    .map(|file_handle| {
144                        let meta = file_handle.meta_ref();
145                        meta.file_size
146                    })
147                    .sum::<u64>()
148            })
149            .sum()
150    }
151
152    /// Returns SST index files'space occupied in current version.
153    pub(crate) fn index_usage(&self) -> u64 {
154        self.levels
155            .iter()
156            .map(|level_meta| {
157                level_meta
158                    .files
159                    .values()
160                    .map(|file_handle| {
161                        let meta = file_handle.meta_ref();
162                        meta.index_file_size
163                    })
164                    .sum::<u64>()
165            })
166            .sum()
167    }
168}
169
170// We only has fixed number of level, so we use array to hold elements. This implementation
171// detail of LevelMetaArray should not be exposed to users of [LevelMetas].
172type LevelMetaArray = [LevelMeta; MAX_LEVEL as usize];
173
174/// Metadata of files in the same SST level.
175#[derive(Clone)]
176pub struct LevelMeta {
177    /// Level number.
178    pub level: Level,
179    /// Handles of SSTs in this level.
180    pub files: HashMap<FileId, FileHandle>,
181}
182
183impl LevelMeta {
184    /// Returns an empty meta of specific `level`.
185    pub(crate) fn new(level: Level) -> LevelMeta {
186        LevelMeta {
187            level,
188            files: HashMap::new(),
189        }
190    }
191
192    /// Returns expired SSTs from current level.
193    pub fn get_expired_files(&self, now: &Timestamp, ttl: &TimeToLive) -> Vec<FileHandle> {
194        self.files
195            .values()
196            .filter(|v| {
197                let (_, end) = v.time_range();
198
199                match ttl.is_expired(&end, now) {
200                    Ok(expired) => expired,
201                    Err(e) => {
202                        common_telemetry::error!(e; "Failed to calculate region TTL expire time");
203                        false
204                    }
205                }
206            })
207            .cloned()
208            .collect()
209    }
210
211    pub fn files(&self) -> impl Iterator<Item = &FileHandle> {
212        self.files.values()
213    }
214}
215
216impl fmt::Debug for LevelMeta {
217    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218        f.debug_struct("LevelMeta")
219            .field("level", &self.level)
220            .field("files", &self.files.keys())
221            .finish()
222    }
223}
224
225fn new_level_meta_vec() -> LevelMetaArray {
226    (0u8..MAX_LEVEL)
227        .map(LevelMeta::new)
228        .collect::<Vec<_>>()
229        .try_into()
230        .unwrap() // safety: LevelMetaArray is a fixed length array with length MAX_LEVEL
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236    use crate::test_util::new_noop_file_purger;
237
238    #[test]
239    fn test_add_files() {
240        let purger = new_noop_file_purger();
241
242        let files = (1..=3)
243            .map(|_| FileMeta {
244                file_id: FileId::random(),
245                ..Default::default()
246            })
247            .collect::<Vec<_>>();
248
249        let mut version = SstVersion::new();
250        // files[1] is added multiple times, and that's ok.
251        version.add_files(purger.clone(), files[..=1].iter().cloned());
252        version.add_files(purger, files[1..].iter().cloned());
253
254        let added_files = &version.levels()[0].files;
255        assert_eq!(added_files.len(), 3);
256        files.iter().for_each(|f| {
257            assert!(added_files.contains_key(&f.file_id));
258        });
259    }
260}