1use std::collections::HashMap;
17use std::fmt;
18use std::sync::Arc;
19
20use common_time::{TimeToLive, Timestamp};
21use store_api::storage::FileId;
22
23use crate::sst::file::{FileHandle, FileMeta, Level, MAX_LEVEL};
24use crate::sst::file_purger::FilePurgerRef;
25
26#[derive(Debug, Clone)]
28pub(crate) struct SstVersion {
29 levels: LevelMetaArray,
31}
32
33pub(crate) type SstVersionRef = Arc<SstVersion>;
34
35impl SstVersion {
36 pub(crate) fn new() -> SstVersion {
38 SstVersion {
39 levels: new_level_meta_vec(),
40 }
41 }
42
43 pub(crate) fn levels(&self) -> &[LevelMeta] {
45 &self.levels
46 }
47
48 pub(crate) fn add_files(
54 &mut self,
55 file_purger: FilePurgerRef,
56 files_to_add: impl Iterator<Item = FileMeta>,
57 ) {
58 for file in files_to_add {
59 let level = file.level;
60 let new_index_version = file.index_version;
61 self.levels[level as usize]
63 .files
64 .entry(file.file_id)
65 .and_modify(|f| {
66 if *f.meta_ref() == file || f.meta_ref().is_index_up_to_date(&file) {
67 if f.index_id().version > new_index_version {
69 common_telemetry::warn!(
71 "Adding file with older index version, existing: {:?}, new: {:?}, ignoring new file",
72 f.meta_ref(),
73 file
74 );
75 }
76 } else {
77 *f = FileHandle::new(file.clone(), file_purger.clone());
79 }
80 })
81 .or_insert_with(|| {
82 FileHandle::new(file.clone(), file_purger.clone())
83 });
84 }
85 }
86
87 pub(crate) fn remove_files(&mut self, files_to_remove: impl Iterator<Item = FileMeta>) {
92 for file in files_to_remove {
93 let level = file.level;
94 if let Some(handle) = self.levels[level as usize].files.remove(&file.file_id) {
95 handle.mark_deleted();
96 }
97 }
98 }
99
100 pub(crate) fn mark_all_deleted(&self) {
102 for level_meta in &self.levels {
103 for file_handle in level_meta.files.values() {
104 file_handle.mark_deleted();
105 }
106 }
107 }
108
109 pub(crate) fn num_rows(&self) -> u64 {
112 self.levels
113 .iter()
114 .map(|level_meta| {
115 level_meta
116 .files
117 .values()
118 .map(|file_handle| {
119 let meta = file_handle.meta_ref();
120 meta.num_rows
121 })
122 .sum::<u64>()
123 })
124 .sum()
125 }
126
127 pub(crate) fn num_files(&self) -> u64 {
129 self.levels
130 .iter()
131 .map(|level_meta| level_meta.files.len() as u64)
132 .sum()
133 }
134
135 pub(crate) fn sst_usage(&self) -> u64 {
137 self.levels
138 .iter()
139 .map(|level_meta| {
140 level_meta
141 .files
142 .values()
143 .map(|file_handle| {
144 let meta = file_handle.meta_ref();
145 meta.file_size
146 })
147 .sum::<u64>()
148 })
149 .sum()
150 }
151
152 pub(crate) fn index_usage(&self) -> u64 {
154 self.levels
155 .iter()
156 .map(|level_meta| {
157 level_meta
158 .files
159 .values()
160 .map(|file_handle| {
161 let meta = file_handle.meta_ref();
162 meta.index_file_size
163 })
164 .sum::<u64>()
165 })
166 .sum()
167 }
168}
169
170type LevelMetaArray = [LevelMeta; MAX_LEVEL as usize];
173
174#[derive(Clone)]
176pub struct LevelMeta {
177 pub level: Level,
179 pub files: HashMap<FileId, FileHandle>,
181}
182
183impl LevelMeta {
184 pub(crate) fn new(level: Level) -> LevelMeta {
186 LevelMeta {
187 level,
188 files: HashMap::new(),
189 }
190 }
191
192 pub fn get_expired_files(&self, now: &Timestamp, ttl: &TimeToLive) -> Vec<FileHandle> {
194 self.files
195 .values()
196 .filter(|v| {
197 let (_, end) = v.time_range();
198
199 match ttl.is_expired(&end, now) {
200 Ok(expired) => expired,
201 Err(e) => {
202 common_telemetry::error!(e; "Failed to calculate region TTL expire time");
203 false
204 }
205 }
206 })
207 .cloned()
208 .collect()
209 }
210
211 pub fn files(&self) -> impl Iterator<Item = &FileHandle> {
212 self.files.values()
213 }
214}
215
216impl fmt::Debug for LevelMeta {
217 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218 f.debug_struct("LevelMeta")
219 .field("level", &self.level)
220 .field("files", &self.files.keys())
221 .finish()
222 }
223}
224
225fn new_level_meta_vec() -> LevelMetaArray {
226 (0u8..MAX_LEVEL)
227 .map(LevelMeta::new)
228 .collect::<Vec<_>>()
229 .try_into()
230 .unwrap() }
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236 use crate::test_util::new_noop_file_purger;
237
238 #[test]
239 fn test_add_files() {
240 let purger = new_noop_file_purger();
241
242 let files = (1..=3)
243 .map(|_| FileMeta {
244 file_id: FileId::random(),
245 ..Default::default()
246 })
247 .collect::<Vec<_>>();
248
249 let mut version = SstVersion::new();
250 version.add_files(purger.clone(), files[..=1].iter().cloned());
252 version.add_files(purger, files[1..].iter().cloned());
253
254 let added_files = &version.levels()[0].files;
255 assert_eq!(added_files.len(), 3);
256 files.iter().for_each(|f| {
257 assert!(added_files.contains_key(&f.file_id));
258 });
259 }
260}