1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_telemetry::debug;
19use dashmap::{DashMap, Entry};
20use store_api::storage::{FileRef, FileRefsManifest, RegionId};
21
22use crate::error::Result;
23use crate::metrics::GC_REF_FILE_CNT;
24use crate::region::MitoRegionRef;
25use crate::sst::file::FileMeta;
26
27#[derive(Debug, Clone, Default)]
30pub struct RegionFileRefs {
31 pub files: HashMap<FileRef, usize>,
33}
34
35#[derive(Debug)]
40pub struct FileReferenceManager {
41 node_id: Option<u64>,
43 files_per_region: DashMap<RegionId, RegionFileRefs>,
45}
46
47pub type FileReferenceManagerRef = Arc<FileReferenceManager>;
48
49impl FileReferenceManager {
50 pub fn new(node_id: Option<u64>) -> Self {
51 Self {
52 node_id,
53 files_per_region: Default::default(),
54 }
55 }
56
57 fn ref_file_set(&self, region_id: RegionId) -> Option<HashSet<FileRef>> {
58 let file_refs = if let Some(file_refs) = self.files_per_region.get(®ion_id) {
59 file_refs.clone()
60 } else {
61 return None;
63 };
64
65 if file_refs.files.is_empty() {
66 return Some(HashSet::new());
69 }
70
71 let ref_file_set: HashSet<FileRef> = file_refs.files.keys().cloned().collect();
72
73 debug!(
74 "Get file refs for region {}, node {:?}, {} files",
75 region_id,
76 self.node_id,
77 ref_file_set.len(),
78 );
79
80 Some(ref_file_set)
81 }
82
83 pub(crate) async fn get_snapshot_of_file_refs(
88 &self,
89 query_regions_for_mem: Vec<MitoRegionRef>,
90 related_regions_in_manifest: Vec<(MitoRegionRef, Vec<RegionId>)>,
91 ) -> Result<FileRefsManifest> {
92 let mut ref_files = HashMap::new();
93 for region_id in query_regions_for_mem.iter().map(|r| r.region_id()) {
95 if let Some(files) = self.ref_file_set(region_id) {
96 ref_files.insert(region_id, files);
97 }
98 }
99
100 let mut manifest_version = HashMap::new();
101
102 for (related_region, queries) in &related_regions_in_manifest {
104 let queries = queries.iter().cloned().collect::<HashSet<_>>();
105 let manifest = related_region.manifest_ctx.manifest().await;
106 for meta in manifest.files.values() {
107 if queries.contains(&meta.region_id) {
108 ref_files
112 .entry(meta.region_id)
113 .or_insert_with(HashSet::new)
114 .insert(FileRef::new(
115 meta.region_id,
116 meta.file_id,
117 meta.index_version(),
118 ));
119 }
120 }
121 manifest_version.insert(related_region.region_id(), manifest.manifest_version);
123 }
124
125 for r in &query_regions_for_mem {
126 let manifest = r.manifest_ctx.manifest().await;
127 ref_files.entry(r.region_id()).and_modify(|refs| {
129 *refs = std::mem::take(refs)
130 .into_iter()
131 .filter(|f| !manifest.files.contains_key(&f.file_id))
132 .collect();
133 });
134 manifest_version.insert(r.region_id(), manifest.manifest_version);
135 }
136
137 Ok(FileRefsManifest {
139 file_refs: ref_files,
140 manifest_version,
141 })
142 }
143
144 pub fn add_file(&self, file_meta: &FileMeta) {
148 let region_id = file_meta.region_id;
149 let mut is_new = false;
150 {
151 let file_ref = FileRef::new(
152 file_meta.region_id,
153 file_meta.file_id,
154 file_meta.index_version(),
155 );
156 self.files_per_region
157 .entry(region_id)
158 .and_modify(|refs| {
159 refs.files
160 .entry(file_ref.clone())
161 .and_modify(|count| *count += 1)
162 .or_insert_with(|| {
163 is_new = true;
164 1
165 });
166 })
167 .or_insert_with(|| RegionFileRefs {
168 files: HashMap::from_iter([(file_ref, 1)]),
169 });
170 }
171 if is_new {
172 GC_REF_FILE_CNT.inc();
173 }
174 }
175
176 pub fn remove_file(&self, file_meta: &FileMeta) {
179 let region_id = file_meta.region_id;
180 let file_ref = FileRef::new(region_id, file_meta.file_id, file_meta.index_version());
181
182 let mut remove_table_entry = false;
183 let mut remove_file_ref = false;
184 let mut file_cnt = 0;
185
186 let region_ref = self.files_per_region.entry(region_id).and_modify(|refs| {
187 let entry = refs.files.entry(file_ref.clone()).and_modify(|count| {
188 if *count > 0 {
189 *count -= 1;
190 }
191 if *count == 0 {
192 remove_file_ref = true;
193 }
194 });
195 if let std::collections::hash_map::Entry::Occupied(o) = entry
196 && remove_file_ref
197 {
198 o.remove_entry();
199 }
200
201 file_cnt = refs.files.len();
202
203 if refs.files.is_empty() {
204 remove_table_entry = true;
205 }
206 });
207
208 if let Entry::Occupied(o) = region_ref
209 && remove_table_entry
210 {
211 o.remove_entry();
212 }
213 if remove_file_ref {
214 GC_REF_FILE_CNT.dec();
215 }
216 }
217
218 pub fn node_id(&self) -> Option<u64> {
219 self.node_id
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use std::num::NonZeroU64;
226
227 use smallvec::SmallVec;
228 use store_api::storage::{FileId, RegionId};
229
230 use super::*;
231 use crate::sst::file::{ColumnIndexMetadata, FileMeta, FileTimeRange, IndexType, RegionFileId};
232
233 #[tokio::test]
234 async fn test_file_ref_mgr() {
235 common_telemetry::init_default_ut_logging();
236
237 let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
238
239 let file_ref_mgr = FileReferenceManager::new(None);
240
241 let file_meta = FileMeta {
242 region_id: sst_file_id.region_id(),
243 file_id: sst_file_id.file_id(),
244 time_range: FileTimeRange::default(),
245 level: 0,
246 file_size: 4096,
247 max_row_group_uncompressed_size: 4096,
248 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
249 indexes: vec![ColumnIndexMetadata {
250 column_id: 0,
251 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
252 }],
253 index_file_size: 4096,
254 index_version: 0,
255 num_rows: 1024,
256 num_row_groups: 1,
257 sequence: NonZeroU64::new(4096),
258 partition_expr: None,
259 num_series: 0,
260 };
261
262 file_ref_mgr.add_file(&file_meta);
263
264 assert_eq!(
265 file_ref_mgr
266 .files_per_region
267 .get(&file_meta.region_id)
268 .unwrap()
269 .files,
270 HashMap::from_iter([(
271 FileRef::new(
272 file_meta.region_id,
273 file_meta.file_id,
274 file_meta.index_version()
275 ),
276 1
277 )])
278 );
279
280 file_ref_mgr.add_file(&file_meta);
281
282 let expected_region_ref_manifest = HashSet::from_iter([FileRef::new(
283 file_meta.region_id,
284 file_meta.file_id,
285 file_meta.index_version(),
286 )]);
287
288 assert_eq!(
289 file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
290 expected_region_ref_manifest
291 );
292
293 assert_eq!(
294 file_ref_mgr
295 .files_per_region
296 .get(&file_meta.region_id)
297 .unwrap()
298 .files,
299 HashMap::from_iter([(
300 FileRef::new(
301 file_meta.region_id,
302 file_meta.file_id,
303 file_meta.index_version()
304 ),
305 2
306 )])
307 );
308
309 assert_eq!(
310 file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
311 expected_region_ref_manifest
312 );
313
314 file_ref_mgr.remove_file(&file_meta);
315
316 assert_eq!(
317 file_ref_mgr
318 .files_per_region
319 .get(&file_meta.region_id)
320 .unwrap()
321 .files,
322 HashMap::from_iter([(
323 FileRef::new(
324 file_meta.region_id,
325 file_meta.file_id,
326 file_meta.index_version()
327 ),
328 1
329 )])
330 );
331
332 assert_eq!(
333 file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
334 expected_region_ref_manifest
335 );
336
337 file_ref_mgr.remove_file(&file_meta);
338
339 assert!(
340 file_ref_mgr
341 .files_per_region
342 .get(&file_meta.region_id)
343 .is_none(),
344 "{:?}",
345 file_ref_mgr.files_per_region
346 );
347
348 assert!(
349 file_ref_mgr.ref_file_set(file_meta.region_id).is_none(),
350 "{:?}",
351 file_ref_mgr.files_per_region
352 );
353 }
354}