1use std::collections::HashMap;
17use std::fmt;
18use std::sync::Arc;
19
20use common_time::{TimeToLive, Timestamp};
21use store_api::storage::FileId;
22
23use crate::sst::file::{FileHandle, FileMeta, Level, MAX_LEVEL};
24use crate::sst::file_purger::FilePurgerRef;
25
26#[derive(Debug, Clone)]
28pub(crate) struct SstVersion {
29 levels: LevelMetaArray,
31}
32
33pub(crate) type SstVersionRef = Arc<SstVersion>;
34
35impl SstVersion {
36 pub(crate) fn new() -> SstVersion {
38 SstVersion {
39 levels: new_level_meta_vec(),
40 }
41 }
42
43 pub(crate) fn levels(&self) -> &[LevelMeta] {
45 &self.levels
46 }
47
48 pub(crate) fn add_files(
53 &mut self,
54 file_purger: FilePurgerRef,
55 files_to_add: impl Iterator<Item = FileMeta>,
56 ) {
57 for file in files_to_add {
58 let level = file.level;
59 self.levels[level as usize]
60 .files
61 .entry(file.file_id)
62 .or_insert_with(|| FileHandle::new(file, file_purger.clone()));
63 }
64 }
65
66 pub(crate) fn remove_files(&mut self, files_to_remove: impl Iterator<Item = FileMeta>) {
71 for file in files_to_remove {
72 let level = file.level;
73 if let Some(handle) = self.levels[level as usize].files.remove(&file.file_id) {
74 handle.mark_deleted();
75 }
76 }
77 }
78
79 pub(crate) fn mark_all_deleted(&self) {
81 for level_meta in &self.levels {
82 for file_handle in level_meta.files.values() {
83 file_handle.mark_deleted();
84 }
85 }
86 }
87
88 pub(crate) fn num_rows(&self) -> u64 {
91 self.levels
92 .iter()
93 .map(|level_meta| {
94 level_meta
95 .files
96 .values()
97 .map(|file_handle| {
98 let meta = file_handle.meta_ref();
99 meta.num_rows
100 })
101 .sum::<u64>()
102 })
103 .sum()
104 }
105
106 pub(crate) fn num_files(&self) -> u64 {
108 self.levels
109 .iter()
110 .map(|level_meta| level_meta.files.len() as u64)
111 .sum()
112 }
113
114 pub(crate) fn sst_usage(&self) -> u64 {
116 self.levels
117 .iter()
118 .map(|level_meta| {
119 level_meta
120 .files
121 .values()
122 .map(|file_handle| {
123 let meta = file_handle.meta_ref();
124 meta.file_size
125 })
126 .sum::<u64>()
127 })
128 .sum()
129 }
130
131 pub(crate) fn index_usage(&self) -> u64 {
133 self.levels
134 .iter()
135 .map(|level_meta| {
136 level_meta
137 .files
138 .values()
139 .map(|file_handle| {
140 let meta = file_handle.meta_ref();
141 meta.index_file_size
142 })
143 .sum::<u64>()
144 })
145 .sum()
146 }
147}
148
149type LevelMetaArray = [LevelMeta; MAX_LEVEL as usize];
152
153#[derive(Clone)]
155pub struct LevelMeta {
156 pub level: Level,
158 pub files: HashMap<FileId, FileHandle>,
160}
161
162impl LevelMeta {
163 pub(crate) fn new(level: Level) -> LevelMeta {
165 LevelMeta {
166 level,
167 files: HashMap::new(),
168 }
169 }
170
171 pub fn get_expired_files(&self, now: &Timestamp, ttl: &TimeToLive) -> Vec<FileHandle> {
173 self.files
174 .values()
175 .filter(|v| {
176 let (_, end) = v.time_range();
177
178 match ttl.is_expired(&end, now) {
179 Ok(expired) => expired,
180 Err(e) => {
181 common_telemetry::error!(e; "Failed to calculate region TTL expire time");
182 false
183 }
184 }
185 })
186 .cloned()
187 .collect()
188 }
189
190 pub fn files(&self) -> impl Iterator<Item = &FileHandle> {
191 self.files.values()
192 }
193}
194
195impl fmt::Debug for LevelMeta {
196 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197 f.debug_struct("LevelMeta")
198 .field("level", &self.level)
199 .field("files", &self.files.keys())
200 .finish()
201 }
202}
203
204fn new_level_meta_vec() -> LevelMetaArray {
205 (0u8..MAX_LEVEL)
206 .map(LevelMeta::new)
207 .collect::<Vec<_>>()
208 .try_into()
209 .unwrap() }
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use crate::test_util::new_noop_file_purger;
216
217 #[test]
218 fn test_add_files() {
219 let purger = new_noop_file_purger();
220
221 let files = (1..=3)
222 .map(|_| FileMeta {
223 file_id: FileId::random(),
224 ..Default::default()
225 })
226 .collect::<Vec<_>>();
227
228 let mut version = SstVersion::new();
229 version.add_files(purger.clone(), files[..=1].iter().cloned());
231 version.add_files(purger, files[1..].iter().cloned());
232
233 let added_files = &version.levels()[0].files;
234 assert_eq!(added_files.len(), 3);
235 files.iter().for_each(|f| {
236 assert!(added_files.contains_key(&f.file_id));
237 });
238 }
239}