1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_telemetry::debug;
19use dashmap::{DashMap, Entry};
20use store_api::storage::{FileRef, FileRefsManifest, RegionId};
21
22use crate::error::Result;
23use crate::metrics::GC_REF_FILE_CNT;
24use crate::region::MitoRegionRef;
25use crate::sst::file::FileMeta;
26
27#[derive(Debug, Clone, Default)]
30pub struct RegionFileRefs {
31 pub files: HashMap<FileRef, usize>,
33}
34
35#[derive(Debug)]
40pub struct FileReferenceManager {
41 node_id: Option<u64>,
43 files_per_region: DashMap<RegionId, RegionFileRefs>,
45}
46
47pub type FileReferenceManagerRef = Arc<FileReferenceManager>;
48
49impl FileReferenceManager {
50 pub fn new(node_id: Option<u64>) -> Self {
51 Self {
52 node_id,
53 files_per_region: Default::default(),
54 }
55 }
56
57 fn ref_file_set(&self, region_id: RegionId) -> Option<HashSet<FileRef>> {
58 let file_refs = if let Some(file_refs) = self.files_per_region.get(®ion_id) {
59 file_refs.clone()
60 } else {
61 return None;
63 };
64
65 if file_refs.files.is_empty() {
66 return Some(HashSet::new());
69 }
70
71 let ref_file_set: HashSet<FileRef> = file_refs.files.keys().cloned().collect();
72
73 debug!(
74 "Get file refs for region {}, node {:?}, {} files",
75 region_id,
76 self.node_id,
77 ref_file_set.len(),
78 );
79
80 Some(ref_file_set)
81 }
82
83 pub(crate) async fn get_snapshot_of_file_refs(
86 &self,
87 query_regions: Vec<MitoRegionRef>,
88 related_regions: Vec<(MitoRegionRef, Vec<RegionId>)>,
89 ) -> Result<FileRefsManifest> {
90 let mut ref_files = HashMap::new();
91 for region_id in query_regions.iter().map(|r| r.region_id()) {
93 if let Some(files) = self.ref_file_set(region_id) {
94 ref_files.insert(region_id, files.into_iter().map(|f| f.file_id).collect());
95 }
96 }
97
98 let mut manifest_version = HashMap::new();
99
100 for r in &query_regions {
101 let manifest = r.manifest_ctx.manifest().await;
102 manifest_version.insert(r.region_id(), manifest.manifest_version);
103 }
104
105 for (related_region, queries) in &related_regions {
107 let queries = queries.iter().cloned().collect::<HashSet<_>>();
108 let manifest = related_region.manifest_ctx.manifest().await;
109 for meta in manifest.files.values() {
110 if queries.contains(&meta.region_id) {
111 ref_files
112 .entry(meta.region_id)
113 .or_insert_with(HashSet::new)
114 .insert(meta.file_id);
115 }
116 }
117 manifest_version.insert(related_region.region_id(), manifest.manifest_version);
119 }
120
121 Ok(FileRefsManifest {
123 file_refs: ref_files,
124 manifest_version,
125 })
126 }
127
128 pub fn add_file(&self, file_meta: &FileMeta) {
132 let region_id = file_meta.region_id;
133 let mut is_new = false;
134 {
135 let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
136 self.files_per_region
137 .entry(region_id)
138 .and_modify(|refs| {
139 refs.files
140 .entry(file_ref.clone())
141 .and_modify(|count| *count += 1)
142 .or_insert_with(|| {
143 is_new = true;
144 1
145 });
146 })
147 .or_insert_with(|| RegionFileRefs {
148 files: HashMap::from_iter([(file_ref, 1)]),
149 });
150 }
151 if is_new {
152 GC_REF_FILE_CNT.inc();
153 }
154 }
155
156 pub fn remove_file(&self, file_meta: &FileMeta) {
159 let region_id = file_meta.region_id;
160 let file_ref = FileRef::new(region_id, file_meta.file_id);
161
162 let mut remove_table_entry = false;
163 let mut remove_file_ref = false;
164 let mut file_cnt = 0;
165
166 let region_ref = self.files_per_region.entry(region_id).and_modify(|refs| {
167 let entry = refs.files.entry(file_ref.clone()).and_modify(|count| {
168 if *count > 0 {
169 *count -= 1;
170 }
171 if *count == 0 {
172 remove_file_ref = true;
173 }
174 });
175 if let std::collections::hash_map::Entry::Occupied(o) = entry
176 && remove_file_ref
177 {
178 o.remove_entry();
179 }
180
181 file_cnt = refs.files.len();
182
183 if refs.files.is_empty() {
184 remove_table_entry = true;
185 }
186 });
187
188 if let Entry::Occupied(o) = region_ref
189 && remove_table_entry
190 {
191 o.remove_entry();
192 }
193 if remove_file_ref {
194 GC_REF_FILE_CNT.dec();
195 }
196 }
197
198 pub fn node_id(&self) -> Option<u64> {
199 self.node_id
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use std::num::NonZeroU64;
206
207 use smallvec::SmallVec;
208 use store_api::storage::{FileId, RegionId};
209
210 use super::*;
211 use crate::sst::file::{FileMeta, FileTimeRange, IndexType, RegionFileId};
212
213 #[tokio::test]
214 async fn test_file_ref_mgr() {
215 common_telemetry::init_default_ut_logging();
216
217 let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
218
219 let file_ref_mgr = FileReferenceManager::new(None);
220
221 let file_meta = FileMeta {
222 region_id: sst_file_id.region_id(),
223 file_id: sst_file_id.file_id(),
224 time_range: FileTimeRange::default(),
225 level: 0,
226 file_size: 4096,
227 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
228 index_file_size: 4096,
229 index_file_id: None,
230 num_rows: 1024,
231 num_row_groups: 1,
232 sequence: NonZeroU64::new(4096),
233 partition_expr: None,
234 num_series: 0,
235 };
236
237 file_ref_mgr.add_file(&file_meta);
238
239 assert_eq!(
240 file_ref_mgr
241 .files_per_region
242 .get(&file_meta.region_id)
243 .unwrap()
244 .files,
245 HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
246 );
247
248 file_ref_mgr.add_file(&file_meta);
249
250 let expected_region_ref_manifest =
251 HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id)]);
252
253 assert_eq!(
254 file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
255 expected_region_ref_manifest
256 );
257
258 assert_eq!(
259 file_ref_mgr
260 .files_per_region
261 .get(&file_meta.region_id)
262 .unwrap()
263 .files,
264 HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 2)])
265 );
266
267 assert_eq!(
268 file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
269 expected_region_ref_manifest
270 );
271
272 file_ref_mgr.remove_file(&file_meta);
273
274 assert_eq!(
275 file_ref_mgr
276 .files_per_region
277 .get(&file_meta.region_id)
278 .unwrap()
279 .files,
280 HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
281 );
282
283 assert_eq!(
284 file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
285 expected_region_ref_manifest
286 );
287
288 file_ref_mgr.remove_file(&file_meta);
289
290 assert!(
291 file_ref_mgr
292 .files_per_region
293 .get(&file_meta.region_id)
294 .is_none(),
295 "{:?}",
296 file_ref_mgr.files_per_region
297 );
298
299 assert!(
300 file_ref_mgr.ref_file_set(file_meta.region_id).is_none(),
301 "{:?}",
302 file_ref_mgr.files_per_region
303 );
304 }
305}