1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_telemetry::debug;
19use dashmap::{DashMap, Entry};
20use serde::{Deserialize, Serialize};
21use store_api::ManifestVersion;
22use store_api::storage::{FileId, RegionId, TableId};
23
24use crate::error::Result;
25use crate::metrics::GC_REF_FILE_CNT;
26use crate::region::RegionMapRef;
27use crate::sst::file::FileMeta;
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
30pub struct FileRef {
31 pub region_id: RegionId,
32 pub file_id: FileId,
33}
34
35impl FileRef {
36 pub fn new(region_id: RegionId, file_id: FileId) -> Self {
37 Self { region_id, file_id }
38 }
39}
40
41#[derive(Debug, Clone, Default)]
44pub struct TableFileRefs {
45 pub files: HashMap<FileRef, usize>,
47}
48
49#[derive(Debug)]
55pub struct FileReferenceManager {
56 node_id: Option<u64>,
58 files_per_table: DashMap<TableId, TableFileRefs>,
60}
61
62pub type FileReferenceManagerRef = Arc<FileReferenceManager>;
63
64#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
66pub struct TableFileRefsManifest {
67 pub file_refs: HashSet<FileRef>,
68 pub manifest_version: HashMap<RegionId, ManifestVersion>,
70}
71
72impl FileReferenceManager {
73 pub fn new(node_id: Option<u64>) -> Self {
74 Self {
75 node_id,
76 files_per_table: Default::default(),
77 }
78 }
79
80 fn ref_file_set(&self, table_id: TableId) -> Option<HashSet<FileRef>> {
81 let file_refs = if let Some(file_refs) = self.files_per_table.get(&table_id) {
82 file_refs.clone()
83 } else {
84 return None;
87 };
88
89 if file_refs.files.is_empty() {
90 return Some(HashSet::new());
93 }
94
95 let ref_file_set: HashSet<FileRef> = file_refs.files.keys().cloned().collect();
96
97 debug!(
98 "Get file refs for table {}, node {:?}, {} files",
99 table_id,
100 self.node_id,
101 ref_file_set.len(),
102 );
103
104 Some(ref_file_set)
105 }
106
107 #[allow(unused)]
121 pub(crate) async fn get_snapshot_of_unmanifested_refs(
122 &self,
123 table_id: TableId,
124 region_map: &RegionMapRef,
125 ) -> Result<TableFileRefsManifest> {
126 let Some(ref_files) = self.ref_file_set(table_id) else {
127 return Ok(Default::default());
128 };
129 let region_list = region_map.list_regions();
130 let table_regions = region_list
131 .iter()
132 .filter(|r| r.region_id().table_id() == table_id)
133 .collect::<Vec<_>>();
134
135 let mut in_manifest_files = HashSet::new();
136 let mut manifest_version = HashMap::new();
137
138 for r in &table_regions {
139 let manifest = r.manifest_ctx.manifest().await;
140 let files = manifest.files.keys().cloned().collect::<Vec<_>>();
141 in_manifest_files.extend(files);
142 manifest_version.insert(r.region_id(), manifest.manifest_version);
143 }
144
145 let ref_files_excluding_in_manifest = ref_files
146 .iter()
147 .filter(|f| !in_manifest_files.contains(&f.file_id))
148 .cloned()
149 .collect::<HashSet<_>>();
150
151 Ok(TableFileRefsManifest {
152 file_refs: ref_files_excluding_in_manifest,
153 manifest_version,
154 })
155 }
156
157 pub fn add_file(&self, file_meta: &FileMeta) {
161 let table_id = file_meta.region_id.table_id();
162 let mut is_new = false;
163 {
164 let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
165 self.files_per_table
166 .entry(table_id)
167 .and_modify(|refs| {
168 refs.files
169 .entry(file_ref.clone())
170 .and_modify(|count| *count += 1)
171 .or_insert_with(|| {
172 is_new = true;
173 1
174 });
175 })
176 .or_insert_with(|| TableFileRefs {
177 files: HashMap::from_iter([(file_ref, 1)]),
178 });
179 }
180 if is_new {
181 GC_REF_FILE_CNT.inc();
182 }
183 }
184
185 pub fn remove_file(&self, file_meta: &FileMeta) {
188 let table_id = file_meta.region_id.table_id();
189 let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
190
191 let mut remove_table_entry = false;
192 let mut remove_file_ref = false;
193 let mut file_cnt = 0;
194
195 let table_ref = self.files_per_table.entry(table_id).and_modify(|refs| {
196 let entry = refs.files.entry(file_ref.clone()).and_modify(|count| {
197 if *count > 0 {
198 *count -= 1;
199 }
200 if *count == 0 {
201 remove_file_ref = true;
202 }
203 });
204 if let std::collections::hash_map::Entry::Occupied(o) = entry
205 && remove_file_ref
206 {
207 o.remove_entry();
208 }
209
210 file_cnt = refs.files.len();
211
212 if refs.files.is_empty() {
213 remove_table_entry = true;
214 }
215 });
216
217 if let Entry::Occupied(o) = table_ref
218 && remove_table_entry
219 {
220 o.remove_entry();
221 }
222 if remove_file_ref {
223 GC_REF_FILE_CNT.dec();
224 }
225 }
226
227 pub fn node_id(&self) -> Option<u64> {
228 self.node_id
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use std::num::NonZeroU64;
235
236 use smallvec::SmallVec;
237 use store_api::storage::RegionId;
238
239 use super::*;
240 use crate::sst::file::{FileMeta, FileTimeRange, IndexType, RegionFileId};
241
242 #[tokio::test]
243 async fn test_file_ref_mgr() {
244 common_telemetry::init_default_ut_logging();
245
246 let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
247
248 let file_ref_mgr = FileReferenceManager::new(None);
249
250 let file_meta = FileMeta {
251 region_id: sst_file_id.region_id(),
252 file_id: sst_file_id.file_id(),
253 time_range: FileTimeRange::default(),
254 level: 0,
255 file_size: 4096,
256 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
257 index_file_size: 4096,
258 num_rows: 1024,
259 num_row_groups: 1,
260 sequence: NonZeroU64::new(4096),
261 partition_expr: None,
262 };
263
264 file_ref_mgr.add_file(&file_meta);
265
266 assert_eq!(
267 file_ref_mgr.files_per_table.get(&0).unwrap().files,
268 HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
269 );
270
271 file_ref_mgr.add_file(&file_meta);
272
273 let expected_table_ref_manifest =
274 HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id)]);
275
276 assert_eq!(
277 file_ref_mgr.ref_file_set(0).unwrap(),
278 expected_table_ref_manifest
279 );
280
281 assert_eq!(
282 file_ref_mgr.files_per_table.get(&0).unwrap().files,
283 HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 2)])
284 );
285
286 assert_eq!(
287 file_ref_mgr.ref_file_set(0).unwrap(),
288 expected_table_ref_manifest
289 );
290
291 file_ref_mgr.remove_file(&file_meta);
292
293 assert_eq!(
294 file_ref_mgr.files_per_table.get(&0).unwrap().files,
295 HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
296 );
297
298 assert_eq!(
299 file_ref_mgr.ref_file_set(0).unwrap(),
300 expected_table_ref_manifest
301 );
302
303 file_ref_mgr.remove_file(&file_meta);
304
305 assert!(
306 file_ref_mgr.files_per_table.get(&0).is_none(),
307 "{:?}",
308 file_ref_mgr.files_per_table
309 );
310
311 assert!(
312 file_ref_mgr.ref_file_set(0).is_none(),
313 "{:?}",
314 file_ref_mgr.files_per_table
315 );
316 }
317}