1use std::collections::HashMap;
17use std::fmt;
18use std::sync::Arc;
19
20use common_time::{TimeToLive, Timestamp};
21
22use crate::sst::file::{FileHandle, FileId, FileMeta, Level, MAX_LEVEL};
23use crate::sst::file_purger::FilePurgerRef;
24
25#[derive(Debug, Clone)]
27pub(crate) struct SstVersion {
28 levels: LevelMetaArray,
30}
31
32pub(crate) type SstVersionRef = Arc<SstVersion>;
33
34impl SstVersion {
35 pub(crate) fn new() -> SstVersion {
37 SstVersion {
38 levels: new_level_meta_vec(),
39 }
40 }
41
42 pub(crate) fn levels(&self) -> &[LevelMeta] {
44 &self.levels
45 }
46
47 pub(crate) fn add_files(
52 &mut self,
53 file_purger: FilePurgerRef,
54 files_to_add: impl Iterator<Item = FileMeta>,
55 ) {
56 for file in files_to_add {
57 let level = file.level;
58 self.levels[level as usize]
59 .files
60 .entry(file.file_id)
61 .or_insert_with(|| FileHandle::new(file, file_purger.clone()));
62 }
63 }
64
65 pub(crate) fn remove_files(&mut self, files_to_remove: impl Iterator<Item = FileMeta>) {
70 for file in files_to_remove {
71 let level = file.level;
72 if let Some(handle) = self.levels[level as usize].files.remove(&file.file_id) {
73 handle.mark_deleted();
74 }
75 }
76 }
77
78 pub(crate) fn mark_all_deleted(&self) {
80 for level_meta in &self.levels {
81 for file_handle in level_meta.files.values() {
82 file_handle.mark_deleted();
83 }
84 }
85 }
86
87 pub(crate) fn num_rows(&self) -> u64 {
90 self.levels
91 .iter()
92 .map(|level_meta| {
93 level_meta
94 .files
95 .values()
96 .map(|file_handle| {
97 let meta = file_handle.meta_ref();
98 meta.num_rows
99 })
100 .sum::<u64>()
101 })
102 .sum()
103 }
104
105 pub(crate) fn num_files(&self) -> u64 {
107 self.levels
108 .iter()
109 .map(|level_meta| level_meta.files.len() as u64)
110 .sum()
111 }
112
113 pub(crate) fn sst_usage(&self) -> u64 {
115 self.levels
116 .iter()
117 .map(|level_meta| {
118 level_meta
119 .files
120 .values()
121 .map(|file_handle| {
122 let meta = file_handle.meta_ref();
123 meta.file_size
124 })
125 .sum::<u64>()
126 })
127 .sum()
128 }
129
130 pub(crate) fn index_usage(&self) -> u64 {
132 self.levels
133 .iter()
134 .map(|level_meta| {
135 level_meta
136 .files
137 .values()
138 .map(|file_handle| {
139 let meta = file_handle.meta_ref();
140 meta.index_file_size
141 })
142 .sum::<u64>()
143 })
144 .sum()
145 }
146}
147
148type LevelMetaArray = [LevelMeta; MAX_LEVEL as usize];
151
152#[derive(Clone)]
154pub struct LevelMeta {
155 pub level: Level,
157 pub files: HashMap<FileId, FileHandle>,
159}
160
161impl LevelMeta {
162 pub(crate) fn new(level: Level) -> LevelMeta {
164 LevelMeta {
165 level,
166 files: HashMap::new(),
167 }
168 }
169
170 pub fn get_expired_files(&self, now: &Timestamp, ttl: &TimeToLive) -> Vec<FileHandle> {
172 self.files
173 .values()
174 .filter(|v| {
175 let (_, end) = v.time_range();
176
177 match ttl.is_expired(&end, now) {
178 Ok(expired) => expired,
179 Err(e) => {
180 common_telemetry::error!(e; "Failed to calculate region TTL expire time");
181 false
182 }
183 }
184 })
185 .cloned()
186 .collect()
187 }
188
189 pub fn files(&self) -> impl Iterator<Item = &FileHandle> {
190 self.files.values()
191 }
192}
193
194impl fmt::Debug for LevelMeta {
195 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196 f.debug_struct("LevelMeta")
197 .field("level", &self.level)
198 .field("files", &self.files.keys())
199 .finish()
200 }
201}
202
203fn new_level_meta_vec() -> LevelMetaArray {
204 (0u8..MAX_LEVEL)
205 .map(LevelMeta::new)
206 .collect::<Vec<_>>()
207 .try_into()
208 .unwrap() }
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214 use crate::test_util::new_noop_file_purger;
215
216 #[test]
217 fn test_add_files() {
218 let purger = new_noop_file_purger();
219
220 let files = (1..=3)
221 .map(|_| FileMeta {
222 file_id: FileId::random(),
223 ..Default::default()
224 })
225 .collect::<Vec<_>>();
226
227 let mut version = SstVersion::new();
228 version.add_files(purger.clone(), files[..=1].iter().cloned());
230 version.add_files(purger, files[1..].iter().cloned());
231
232 let added_files = &version.levels()[0].files;
233 assert_eq!(added_files.len(), 3);
234 files.iter().for_each(|f| {
235 assert!(added_files.contains_key(&f.file_id));
236 });
237 }
238}