1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_telemetry::debug;
19use dashmap::{DashMap, Entry};
20use store_api::storage::{FileRef, FileRefsManifest, RegionId};
21
22use crate::error::Result;
23use crate::metrics::GC_REF_FILE_CNT;
24use crate::region::MitoRegionRef;
25use crate::sst::file::FileMeta;
26
27#[derive(Debug, Clone, Default)]
30pub struct RegionFileRefs {
31 pub files: HashMap<FileRef, usize>,
33}
34
35#[derive(Debug)]
40pub struct FileReferenceManager {
41 node_id: Option<u64>,
43 files_per_region: DashMap<RegionId, RegionFileRefs>,
45 gc_enabled: bool,
48}
49
50pub type FileReferenceManagerRef = Arc<FileReferenceManager>;
51
52impl FileReferenceManager {
53 pub fn new(node_id: Option<u64>) -> Self {
54 Self {
55 node_id,
56 files_per_region: Default::default(),
57 gc_enabled: false,
58 }
59 }
60
61 pub fn with_gc_enabled(node_id: Option<u64>, gc_enabled: bool) -> Self {
63 Self {
64 node_id,
65 files_per_region: Default::default(),
66 gc_enabled,
67 }
68 }
69
70 pub fn is_gc_enabled(&self) -> bool {
76 self.gc_enabled
77 }
78
79 fn ref_file_set(&self, region_id: RegionId) -> Option<HashSet<FileRef>> {
80 let file_refs = if let Some(file_refs) = self.files_per_region.get(®ion_id) {
81 file_refs.clone()
82 } else {
83 return None;
85 };
86
87 if file_refs.files.is_empty() {
88 return Some(HashSet::new());
91 }
92
93 let ref_file_set: HashSet<FileRef> = file_refs.files.keys().cloned().collect();
94
95 debug!(
96 "Get file refs for region {}, node {:?}, {} files",
97 region_id,
98 self.node_id,
99 ref_file_set.len(),
100 );
101
102 Some(ref_file_set)
103 }
104
105 pub(crate) async fn get_snapshot_of_file_refs(
110 &self,
111 query_regions_for_mem: Vec<MitoRegionRef>,
112 dst_region_to_src_regions: Vec<(MitoRegionRef, HashSet<RegionId>)>,
113 ) -> Result<FileRefsManifest> {
114 let mut ref_files = HashMap::new();
115 for region_id in query_regions_for_mem.iter().map(|r| r.region_id()) {
117 if let Some(files) = self.ref_file_set(region_id) {
118 ref_files.insert(region_id, files);
119 }
120 }
121
122 let mut manifest_version = HashMap::new();
123
124 let mut cross_region_refs = HashMap::new();
125
126 for (dst_region, src_regions) in &dst_region_to_src_regions {
128 let manifest = dst_region.manifest_ctx.manifest().await;
129 for meta in manifest.files.values() {
130 if src_regions.contains(&meta.region_id) {
131 cross_region_refs
132 .entry(meta.region_id)
133 .or_insert_with(HashSet::new)
134 .insert(dst_region.region_id());
135 ref_files
139 .entry(meta.region_id)
140 .or_insert_with(HashSet::new)
141 .insert(FileRef::new(
142 meta.region_id,
143 meta.file_id,
144 meta.index_version(),
145 ));
146 }
147 }
148 manifest_version.insert(dst_region.region_id(), manifest.manifest_version);
150 }
151
152 for r in &query_regions_for_mem {
153 let manifest = r.manifest_ctx.manifest().await;
154 ref_files.entry(r.region_id()).and_modify(|refs| {
156 *refs = std::mem::take(refs)
157 .into_iter()
158 .filter(|f| !manifest.files.contains_key(&f.file_id))
159 .collect();
160 });
161 manifest_version.insert(r.region_id(), manifest.manifest_version);
162 }
163
164 Ok(FileRefsManifest {
166 file_refs: ref_files,
167 manifest_version,
168 cross_region_refs,
169 })
170 }
171
172 pub fn add_file(&self, file_meta: &FileMeta) {
176 let region_id = file_meta.region_id;
177 let mut is_new = false;
178 {
179 let file_ref = FileRef::new(
180 file_meta.region_id,
181 file_meta.file_id,
182 file_meta.index_version(),
183 );
184 self.files_per_region
185 .entry(region_id)
186 .and_modify(|refs| {
187 refs.files
188 .entry(file_ref.clone())
189 .and_modify(|count| *count += 1)
190 .or_insert_with(|| {
191 is_new = true;
192 1
193 });
194 })
195 .or_insert_with(|| RegionFileRefs {
196 files: HashMap::from_iter([(file_ref, 1)]),
197 });
198 }
199 if is_new {
200 GC_REF_FILE_CNT.inc();
201 }
202 }
203
204 pub fn remove_file(&self, file_meta: &FileMeta) {
207 let region_id = file_meta.region_id;
208 let file_ref = FileRef::new(region_id, file_meta.file_id, file_meta.index_version());
209
210 let mut remove_table_entry = false;
211 let mut remove_file_ref = false;
212 let mut file_cnt = 0;
213
214 let region_ref = self.files_per_region.entry(region_id).and_modify(|refs| {
215 let entry = refs.files.entry(file_ref.clone()).and_modify(|count| {
216 if *count > 0 {
217 *count -= 1;
218 }
219 if *count == 0 {
220 remove_file_ref = true;
221 }
222 });
223 if let std::collections::hash_map::Entry::Occupied(o) = entry
224 && remove_file_ref
225 {
226 o.remove_entry();
227 }
228
229 file_cnt = refs.files.len();
230
231 if refs.files.is_empty() {
232 remove_table_entry = true;
233 }
234 });
235
236 if let Entry::Occupied(o) = region_ref
237 && remove_table_entry
238 {
239 o.remove_entry();
240 }
241 if remove_file_ref {
242 GC_REF_FILE_CNT.dec();
243 }
244 }
245
246 pub fn node_id(&self) -> Option<u64> {
247 self.node_id
248 }
249}
250
251#[cfg(test)]
252mod tests {
253 use std::num::NonZeroU64;
254
255 use smallvec::SmallVec;
256 use store_api::storage::{FileId, RegionId};
257
258 use super::*;
259 use crate::sst::file::{ColumnIndexMetadata, FileMeta, FileTimeRange, IndexType, RegionFileId};
260
261 #[tokio::test]
262 async fn test_file_ref_mgr() {
263 common_telemetry::init_default_ut_logging();
264
265 let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
266
267 let file_ref_mgr = FileReferenceManager::new(None);
268
269 let file_meta = FileMeta {
270 region_id: sst_file_id.region_id(),
271 file_id: sst_file_id.file_id(),
272 time_range: FileTimeRange::default(),
273 level: 0,
274 file_size: 4096,
275 max_row_group_uncompressed_size: 4096,
276 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
277 indexes: vec![ColumnIndexMetadata {
278 column_id: 0,
279 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
280 }],
281 index_file_size: 4096,
282 index_version: 0,
283 num_rows: 1024,
284 num_row_groups: 1,
285 sequence: NonZeroU64::new(4096),
286 partition_expr: None,
287 num_series: 0,
288 };
289
290 file_ref_mgr.add_file(&file_meta);
291
292 assert_eq!(
293 file_ref_mgr
294 .files_per_region
295 .get(&file_meta.region_id)
296 .unwrap()
297 .files,
298 HashMap::from_iter([(
299 FileRef::new(
300 file_meta.region_id,
301 file_meta.file_id,
302 file_meta.index_version()
303 ),
304 1
305 )])
306 );
307
308 file_ref_mgr.add_file(&file_meta);
309
310 let expected_region_ref_manifest = HashSet::from_iter([FileRef::new(
311 file_meta.region_id,
312 file_meta.file_id,
313 file_meta.index_version(),
314 )]);
315
316 assert_eq!(
317 file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
318 expected_region_ref_manifest
319 );
320
321 assert_eq!(
322 file_ref_mgr
323 .files_per_region
324 .get(&file_meta.region_id)
325 .unwrap()
326 .files,
327 HashMap::from_iter([(
328 FileRef::new(
329 file_meta.region_id,
330 file_meta.file_id,
331 file_meta.index_version()
332 ),
333 2
334 )])
335 );
336
337 assert_eq!(
338 file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
339 expected_region_ref_manifest
340 );
341
342 file_ref_mgr.remove_file(&file_meta);
343
344 assert_eq!(
345 file_ref_mgr
346 .files_per_region
347 .get(&file_meta.region_id)
348 .unwrap()
349 .files,
350 HashMap::from_iter([(
351 FileRef::new(
352 file_meta.region_id,
353 file_meta.file_id,
354 file_meta.index_version()
355 ),
356 1
357 )])
358 );
359
360 assert_eq!(
361 file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
362 expected_region_ref_manifest
363 );
364
365 file_ref_mgr.remove_file(&file_meta);
366
367 assert!(
368 file_ref_mgr
369 .files_per_region
370 .get(&file_meta.region_id)
371 .is_none(),
372 "{:?}",
373 file_ref_mgr.files_per_region
374 );
375
376 assert!(
377 file_ref_mgr.ref_file_set(file_meta.region_id).is_none(),
378 "{:?}",
379 file_ref_mgr.files_per_region
380 );
381 }
382}