1use std::collections::HashMap;
17use std::fmt;
18use std::sync::Arc;
19
20use common_time::{TimeToLive, Timestamp};
21use store_api::storage::FileId;
22
23use crate::sst::file::{FileHandle, FileMeta, Level, MAX_LEVEL};
24use crate::sst::file_purger::FilePurgerRef;
25
26#[derive(Debug, Clone)]
28pub(crate) struct SstVersion {
29 levels: LevelMetaArray,
31}
32
33pub(crate) type SstVersionRef = Arc<SstVersion>;
34
35impl SstVersion {
36 pub(crate) fn new() -> SstVersion {
38 SstVersion {
39 levels: new_level_meta_vec(),
40 }
41 }
42
43 pub(crate) fn levels(&self) -> &[LevelMeta] {
45 &self.levels
46 }
47
48 pub(crate) fn add_files(
54 &mut self,
55 file_purger: FilePurgerRef,
56 files_to_add: impl Iterator<Item = FileMeta>,
57 ) {
58 for file in files_to_add {
59 let level = file.level;
60 let new_index_version = file.index_version;
61 self.levels[level as usize]
63 .files
64 .entry(file.file_id)
65 .and_modify(|f| {
66 if *f.meta_ref() == file || f.meta_ref().is_index_up_to_date(&file) {
67 if f.index_id().version > new_index_version {
69 common_telemetry::warn!(
71 "Adding file with older index version, existing: {:?}, new: {:?}, ignoring new file",
72 f.meta_ref(),
73 file
74 );
75 }
76 } else {
77 *f = FileHandle::new(file.clone(), file_purger.clone());
79 }
80 })
81 .or_insert_with(|| FileHandle::new(file.clone(), file_purger.clone()));
82 }
83 }
84
85 pub(crate) fn remove_files(&mut self, files_to_remove: impl Iterator<Item = FileMeta>) {
90 for file in files_to_remove {
91 let level = file.level;
92 if let Some(handle) = self.levels[level as usize].files.remove(&file.file_id) {
93 handle.mark_deleted();
94 }
95 }
96 }
97
98 pub(crate) fn mark_all_deleted(&self) {
100 for level_meta in &self.levels {
101 for file_handle in level_meta.files.values() {
102 file_handle.mark_deleted();
103 }
104 }
105 }
106
107 pub(crate) fn num_rows(&self) -> u64 {
110 self.levels
111 .iter()
112 .map(|level_meta| {
113 level_meta
114 .files
115 .values()
116 .map(|file_handle| {
117 let meta = file_handle.meta_ref();
118 meta.num_rows
119 })
120 .sum::<u64>()
121 })
122 .sum()
123 }
124
125 pub(crate) fn num_files(&self) -> u64 {
127 self.levels
128 .iter()
129 .map(|level_meta| level_meta.files.len() as u64)
130 .sum()
131 }
132
133 pub(crate) fn sst_usage(&self) -> u64 {
135 self.levels
136 .iter()
137 .map(|level_meta| {
138 level_meta
139 .files
140 .values()
141 .map(|file_handle| {
142 let meta = file_handle.meta_ref();
143 meta.file_size
144 })
145 .sum::<u64>()
146 })
147 .sum()
148 }
149
150 pub(crate) fn index_usage(&self) -> u64 {
152 self.levels
153 .iter()
154 .map(|level_meta| {
155 level_meta
156 .files
157 .values()
158 .map(|file_handle| {
159 let meta = file_handle.meta_ref();
160 meta.index_file_size
161 })
162 .sum::<u64>()
163 })
164 .sum()
165 }
166}
167
168type LevelMetaArray = [LevelMeta; MAX_LEVEL as usize];
171
172#[derive(Clone)]
174pub struct LevelMeta {
175 pub level: Level,
177 pub files: HashMap<FileId, FileHandle>,
179}
180
181impl LevelMeta {
182 pub(crate) fn new(level: Level) -> LevelMeta {
184 LevelMeta {
185 level,
186 files: HashMap::new(),
187 }
188 }
189
190 pub fn get_expired_files(&self, now: &Timestamp, ttl: &TimeToLive) -> Vec<FileHandle> {
192 self.files
193 .values()
194 .filter(|v| {
195 let (_, end) = v.time_range();
196
197 match ttl.is_expired(&end, now) {
198 Ok(expired) => expired,
199 Err(e) => {
200 common_telemetry::error!(e; "Failed to calculate region TTL expire time");
201 false
202 }
203 }
204 })
205 .cloned()
206 .collect()
207 }
208
209 pub fn files(&self) -> impl Iterator<Item = &FileHandle> {
210 self.files.values()
211 }
212}
213
214impl fmt::Debug for LevelMeta {
215 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
216 f.debug_struct("LevelMeta")
217 .field("level", &self.level)
218 .field("files", &self.files.keys())
219 .finish()
220 }
221}
222
223fn new_level_meta_vec() -> LevelMetaArray {
224 (0u8..MAX_LEVEL)
225 .map(LevelMeta::new)
226 .collect::<Vec<_>>()
227 .try_into()
228 .unwrap() }
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234 use crate::test_util::new_noop_file_purger;
235
236 #[test]
237 fn test_add_files() {
238 let purger = new_noop_file_purger();
239
240 let files = (1..=3)
241 .map(|_| FileMeta {
242 file_id: FileId::random(),
243 ..Default::default()
244 })
245 .collect::<Vec<_>>();
246
247 let mut version = SstVersion::new();
248 version.add_files(purger.clone(), files[..=1].iter().cloned());
250 version.add_files(purger, files[1..].iter().cloned());
251
252 let added_files = &version.levels()[0].files;
253 assert_eq!(added_files.len(), 3);
254 files.iter().for_each(|f| {
255 assert!(added_files.contains_key(&f.file_id));
256 });
257 }
258}