1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_telemetry::debug;
19use dashmap::{DashMap, Entry};
20use serde::{Deserialize, Serialize};
21use store_api::storage::{RegionId, TableId};
22use store_api::ManifestVersion;
23
24use crate::error::Result;
25use crate::metrics::GC_REF_FILE_CNT;
26use crate::region::RegionMapRef;
27use crate::sst::file::{FileId, FileMeta};
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
30pub struct FileRef {
31 pub region_id: RegionId,
32 pub file_id: FileId,
33}
34
35impl FileRef {
36 pub fn new(region_id: RegionId, file_id: FileId) -> Self {
37 Self { region_id, file_id }
38 }
39}
40
41#[derive(Debug, Clone, Default)]
44pub struct TableFileRefs {
45 pub files: HashMap<FileRef, usize>,
47}
48
49#[derive(Debug)]
55pub struct FileReferenceManager {
56 node_id: Option<u64>,
58 files_per_table: DashMap<TableId, TableFileRefs>,
60}
61
62pub type FileReferenceManagerRef = Arc<FileReferenceManager>;
63
64#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
66pub struct TableFileRefsManifest {
67 pub file_refs: HashSet<FileRef>,
68 pub manifest_version: HashMap<RegionId, ManifestVersion>,
70}
71
72impl FileReferenceManager {
73 pub fn new(node_id: Option<u64>) -> Self {
74 Self {
75 node_id,
76 files_per_table: Default::default(),
77 }
78 }
79
80 fn ref_file_set(&self, table_id: TableId) -> Option<HashSet<FileRef>> {
81 let file_refs = if let Some(file_refs) = self.files_per_table.get(&table_id) {
82 file_refs.clone()
83 } else {
84 return None;
87 };
88
89 if file_refs.files.is_empty() {
90 return Some(HashSet::new());
93 }
94
95 let ref_file_set: HashSet<FileRef> = file_refs.files.keys().cloned().collect();
96
97 debug!(
98 "Get file refs for table {}, node {:?}, {} files",
99 table_id,
100 self.node_id,
101 ref_file_set.len(),
102 );
103
104 Some(ref_file_set)
105 }
106
107 #[allow(unused)]
121 pub(crate) async fn get_snapshot_of_unmanifested_refs(
122 &self,
123 table_id: TableId,
124 region_map: &RegionMapRef,
125 ) -> Result<TableFileRefsManifest> {
126 let Some(ref_files) = self.ref_file_set(table_id) else {
127 return Ok(Default::default());
128 };
129 let region_list = region_map.list_regions();
130 let table_regions = region_list
131 .iter()
132 .filter(|r| r.region_id().table_id() == table_id)
133 .collect::<Vec<_>>();
134
135 let mut in_manifest_files = HashSet::new();
136 let mut manifest_version = HashMap::new();
137
138 for r in &table_regions {
139 let manifest = r.manifest_ctx.manifest().await;
140 let files = manifest.files.keys().cloned().collect::<Vec<_>>();
141 in_manifest_files.extend(files);
142 manifest_version.insert(r.region_id(), manifest.manifest_version);
143 }
144
145 let ref_files_excluding_in_manifest = ref_files
146 .iter()
147 .filter(|f| !in_manifest_files.contains(&f.file_id))
148 .cloned()
149 .collect::<HashSet<_>>();
150
151 Ok(TableFileRefsManifest {
152 file_refs: ref_files_excluding_in_manifest,
153 manifest_version,
154 })
155 }
156
157 pub fn add_file(&self, file_meta: &FileMeta) {
161 let table_id = file_meta.region_id.table_id();
162 let mut is_new = false;
163 {
164 let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
165 self.files_per_table
166 .entry(table_id)
167 .and_modify(|refs| {
168 refs.files
169 .entry(file_ref.clone())
170 .and_modify(|count| *count += 1)
171 .or_insert_with(|| {
172 is_new = true;
173 1
174 });
175 })
176 .or_insert_with(|| TableFileRefs {
177 files: HashMap::from_iter([(file_ref, 1)]),
178 });
179 }
180 if is_new {
181 GC_REF_FILE_CNT.inc();
182 }
183 }
184
185 pub fn remove_file(&self, file_meta: &FileMeta) {
188 let table_id = file_meta.region_id.table_id();
189 let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
190
191 let mut remove_table_entry = false;
192 let mut remove_file_ref = false;
193 let mut file_cnt = 0;
194
195 let table_ref = self.files_per_table.entry(table_id).and_modify(|refs| {
196 let entry = refs.files.entry(file_ref.clone()).and_modify(|count| {
197 if *count > 0 {
198 *count -= 1;
199 }
200 if *count == 0 {
201 remove_file_ref = true;
202 }
203 });
204 if let std::collections::hash_map::Entry::Occupied(o) = entry
205 && remove_file_ref
206 {
207 o.remove_entry();
208 }
209
210 file_cnt = refs.files.len();
211
212 if refs.files.is_empty() {
213 remove_table_entry = true;
214 }
215 });
216
217 if let Entry::Occupied(o) = table_ref
218 && remove_table_entry
219 {
220 o.remove_entry();
221 }
222 if remove_file_ref {
223 GC_REF_FILE_CNT.dec();
224 }
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use std::num::NonZeroU64;
231
232 use smallvec::SmallVec;
233 use store_api::storage::RegionId;
234
235 use super::*;
236 use crate::sst::file::{FileId, FileMeta, FileTimeRange, IndexType, RegionFileId};
237
238 #[tokio::test]
239 async fn test_file_ref_mgr() {
240 common_telemetry::init_default_ut_logging();
241
242 let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
243
244 let file_ref_mgr = FileReferenceManager::new(None);
245
246 let file_meta = FileMeta {
247 region_id: sst_file_id.region_id(),
248 file_id: sst_file_id.file_id(),
249 time_range: FileTimeRange::default(),
250 level: 0,
251 file_size: 4096,
252 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
253 index_file_size: 4096,
254 num_rows: 1024,
255 num_row_groups: 1,
256 sequence: NonZeroU64::new(4096),
257 };
258
259 file_ref_mgr.add_file(&file_meta);
260
261 assert_eq!(
262 file_ref_mgr.files_per_table.get(&0).unwrap().files,
263 HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
264 );
265
266 file_ref_mgr.add_file(&file_meta);
267
268 let expected_table_ref_manifest =
269 HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id)]);
270
271 assert_eq!(
272 file_ref_mgr.ref_file_set(0).unwrap(),
273 expected_table_ref_manifest
274 );
275
276 assert_eq!(
277 file_ref_mgr.files_per_table.get(&0).unwrap().files,
278 HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 2)])
279 );
280
281 assert_eq!(
282 file_ref_mgr.ref_file_set(0).unwrap(),
283 expected_table_ref_manifest
284 );
285
286 file_ref_mgr.remove_file(&file_meta);
287
288 assert_eq!(
289 file_ref_mgr.files_per_table.get(&0).unwrap().files,
290 HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
291 );
292
293 assert_eq!(
294 file_ref_mgr.ref_file_set(0).unwrap(),
295 expected_table_ref_manifest
296 );
297
298 file_ref_mgr.remove_file(&file_meta);
299
300 assert!(
301 file_ref_mgr.files_per_table.get(&0).is_none(),
302 "{:?}",
303 file_ref_mgr.files_per_table
304 );
305
306 assert!(
307 file_ref_mgr.ref_file_set(0).is_none(),
308 "{:?}",
309 file_ref_mgr.files_per_table
310 );
311 }
312}