1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_telemetry::debug;
19use dashmap::{DashMap, Entry};
20use store_api::storage::{FileRef, FileRefsManifest, RegionId};
21
22use crate::error::Result;
23use crate::metrics::GC_REF_FILE_CNT;
24use crate::region::MitoRegionRef;
25use crate::sst::file::FileMeta;
26
27#[derive(Debug, Clone, Default)]
30pub struct RegionFileRefs {
31 pub files: HashMap<FileRef, usize>,
33}
34
35#[derive(Debug)]
40pub struct FileReferenceManager {
41 node_id: Option<u64>,
43 files_per_region: DashMap<RegionId, RegionFileRefs>,
45}
46
47pub type FileReferenceManagerRef = Arc<FileReferenceManager>;
48
49impl FileReferenceManager {
50 pub fn new(node_id: Option<u64>) -> Self {
51 Self {
52 node_id,
53 files_per_region: Default::default(),
54 }
55 }
56
57 fn ref_file_set(&self, region_id: RegionId) -> Option<HashSet<FileRef>> {
58 let file_refs = if let Some(file_refs) = self.files_per_region.get(®ion_id) {
59 file_refs.clone()
60 } else {
61 return None;
63 };
64
65 if file_refs.files.is_empty() {
66 return Some(HashSet::new());
69 }
70
71 let ref_file_set: HashSet<FileRef> = file_refs.files.keys().cloned().collect();
72
73 debug!(
74 "Get file refs for region {}, node {:?}, {} files",
75 region_id,
76 self.node_id,
77 ref_file_set.len(),
78 );
79
80 Some(ref_file_set)
81 }
82
83 #[allow(unused)]
97 pub(crate) async fn get_snapshot_of_unmanifested_refs(
98 &self,
99 regions: Vec<MitoRegionRef>,
100 ) -> Result<FileRefsManifest> {
101 let mut ref_files = HashMap::new();
102 for region_id in regions.iter().map(|r| r.region_id()) {
103 if let Some(files) = self.ref_file_set(region_id) {
104 ref_files.insert(region_id, files);
105 }
106 }
107
108 let mut in_manifest_files = HashSet::new();
109 let mut manifest_version = HashMap::new();
110
111 for r in ®ions {
112 let manifest = r.manifest_ctx.manifest().await;
113 let files = manifest.files.keys().cloned().collect::<Vec<_>>();
114 in_manifest_files.extend(files);
115 manifest_version.insert(r.region_id(), manifest.manifest_version);
116 }
117
118 let ref_files_excluding_in_manifest = ref_files
119 .iter()
120 .map(|(r, f)| {
121 (
122 *r,
123 f.iter()
124 .filter_map(|f| {
125 (!in_manifest_files.contains(&f.file_id)).then_some(f.file_id)
126 })
127 .collect::<HashSet<_>>(),
128 )
129 })
130 .collect();
131 Ok(FileRefsManifest {
132 file_refs: ref_files_excluding_in_manifest,
133 manifest_version,
134 })
135 }
136
137 pub fn add_file(&self, file_meta: &FileMeta) {
141 let region_id = file_meta.region_id;
142 let mut is_new = false;
143 {
144 let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
145 self.files_per_region
146 .entry(region_id)
147 .and_modify(|refs| {
148 refs.files
149 .entry(file_ref.clone())
150 .and_modify(|count| *count += 1)
151 .or_insert_with(|| {
152 is_new = true;
153 1
154 });
155 })
156 .or_insert_with(|| RegionFileRefs {
157 files: HashMap::from_iter([(file_ref, 1)]),
158 });
159 }
160 if is_new {
161 GC_REF_FILE_CNT.inc();
162 }
163 }
164
165 pub fn remove_file(&self, file_meta: &FileMeta) {
168 let region_id = file_meta.region_id;
169 let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
170
171 let mut remove_table_entry = false;
172 let mut remove_file_ref = false;
173 let mut file_cnt = 0;
174
175 let region_ref = self.files_per_region.entry(region_id).and_modify(|refs| {
176 let entry = refs.files.entry(file_ref.clone()).and_modify(|count| {
177 if *count > 0 {
178 *count -= 1;
179 }
180 if *count == 0 {
181 remove_file_ref = true;
182 }
183 });
184 if let std::collections::hash_map::Entry::Occupied(o) = entry
185 && remove_file_ref
186 {
187 o.remove_entry();
188 }
189
190 file_cnt = refs.files.len();
191
192 if refs.files.is_empty() {
193 remove_table_entry = true;
194 }
195 });
196
197 if let Entry::Occupied(o) = region_ref
198 && remove_table_entry
199 {
200 o.remove_entry();
201 }
202 if remove_file_ref {
203 GC_REF_FILE_CNT.dec();
204 }
205 }
206
207 pub fn node_id(&self) -> Option<u64> {
208 self.node_id
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use std::num::NonZeroU64;
215
216 use smallvec::SmallVec;
217 use store_api::storage::{FileId, RegionId};
218
219 use super::*;
220 use crate::sst::file::{FileMeta, FileTimeRange, IndexType, RegionFileId};
221
222 #[tokio::test]
223 async fn test_file_ref_mgr() {
224 common_telemetry::init_default_ut_logging();
225
226 let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
227
228 let file_ref_mgr = FileReferenceManager::new(None);
229
230 let file_meta = FileMeta {
231 region_id: sst_file_id.region_id(),
232 file_id: sst_file_id.file_id(),
233 time_range: FileTimeRange::default(),
234 level: 0,
235 file_size: 4096,
236 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
237 index_file_size: 4096,
238 index_file_id: None,
239 num_rows: 1024,
240 num_row_groups: 1,
241 sequence: NonZeroU64::new(4096),
242 partition_expr: None,
243 num_series: 0,
244 };
245
246 file_ref_mgr.add_file(&file_meta);
247
248 assert_eq!(
249 file_ref_mgr
250 .files_per_region
251 .get(&file_meta.region_id)
252 .unwrap()
253 .files,
254 HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
255 );
256
257 file_ref_mgr.add_file(&file_meta);
258
259 let expected_region_ref_manifest =
260 HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id)]);
261
262 assert_eq!(
263 file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
264 expected_region_ref_manifest
265 );
266
267 assert_eq!(
268 file_ref_mgr
269 .files_per_region
270 .get(&file_meta.region_id)
271 .unwrap()
272 .files,
273 HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 2)])
274 );
275
276 assert_eq!(
277 file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
278 expected_region_ref_manifest
279 );
280
281 file_ref_mgr.remove_file(&file_meta);
282
283 assert_eq!(
284 file_ref_mgr
285 .files_per_region
286 .get(&file_meta.region_id)
287 .unwrap()
288 .files,
289 HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
290 );
291
292 assert_eq!(
293 file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
294 expected_region_ref_manifest
295 );
296
297 file_ref_mgr.remove_file(&file_meta);
298
299 assert!(
300 file_ref_mgr
301 .files_per_region
302 .get(&file_meta.region_id)
303 .is_none(),
304 "{:?}",
305 file_ref_mgr.files_per_region
306 );
307
308 assert!(
309 file_ref_mgr.ref_file_set(file_meta.region_id).is_none(),
310 "{:?}",
311 file_ref_mgr.files_per_region
312 );
313 }
314}