mito2/sst/
file_ref.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_telemetry::debug;
19use dashmap::{DashMap, Entry};
20use store_api::storage::{FileRef, FileRefsManifest, RegionId};
21
22use crate::error::Result;
23use crate::metrics::GC_REF_FILE_CNT;
24use crate::region::MitoRegionRef;
25use crate::sst::file::FileMeta;
26
27/// File references for a region.
28/// It contains all files referenced by the region.
29#[derive(Debug, Clone, Default)]
30pub struct RegionFileRefs {
31    /// (FileRef, Ref Count) meaning how many FileHandleInner is opened for this file.
32    pub files: HashMap<FileRef, usize>,
33}
34
35/// Manages all file references in one datanode.
36/// It keeps track of which files are referenced and group by table ids.
37/// This is useful for ensuring that files are not deleted while they are still in use by any
38/// query.
39#[derive(Debug)]
40pub struct FileReferenceManager {
41    /// Datanode id. used to determine tmp ref file name.
42    node_id: Option<u64>,
43    /// TODO(discord9): use no hash hasher since table id is sequential.
44    files_per_region: DashMap<RegionId, RegionFileRefs>,
45    /// Whether global GC is enabled.
46    /// This is only meaningful when using object store (not local filesystem).
47    gc_enabled: bool,
48}
49
50pub type FileReferenceManagerRef = Arc<FileReferenceManager>;
51
52impl FileReferenceManager {
53    pub fn new(node_id: Option<u64>) -> Self {
54        Self {
55            node_id,
56            files_per_region: Default::default(),
57            gc_enabled: false,
58        }
59    }
60
61    /// Creates a new FileReferenceManager with GC configuration.
62    pub fn with_gc_enabled(node_id: Option<u64>, gc_enabled: bool) -> Self {
63        Self {
64            node_id,
65            files_per_region: Default::default(),
66            gc_enabled,
67        }
68    }
69
70    /// Returns whether global GC is enabled.
71    ///
72    /// This is useful for determining the file deletion strategy:
73    /// - If GC is enabled (and using object store), files are tracked but not immediately deleted
74    /// - If GC is disabled (or using local filesystem), files are deleted immediately
75    pub fn is_gc_enabled(&self) -> bool {
76        self.gc_enabled
77    }
78
79    fn ref_file_set(&self, region_id: RegionId) -> Option<HashSet<FileRef>> {
80        let file_refs = if let Some(file_refs) = self.files_per_region.get(&region_id) {
81            file_refs.clone()
82        } else {
83            // region id not found.
84            return None;
85        };
86
87        if file_refs.files.is_empty() {
88            // still return an empty manifest to indicate no files are referenced.
89            // and differentiate from error case where table_id not found.
90            return Some(HashSet::new());
91        }
92
93        let ref_file_set: HashSet<FileRef> = file_refs.files.keys().cloned().collect();
94
95        debug!(
96            "Get file refs for region {}, node {:?}, {} files",
97            region_id,
98            self.node_id,
99            ref_file_set.len(),
100        );
101
102        Some(ref_file_set)
103    }
104
105    /// Gets all ref files for the given regions, meaning all open FileHandles for those regions
106    /// and from related regions' manifests.
107    /// `query_regions_for_mem` queries for in memory file handles.
108    /// `related_regions_in_manifest` queries for related regions' manifests to get more file refs of given region ids.
109    pub(crate) async fn get_snapshot_of_file_refs(
110        &self,
111        query_regions_for_mem: Vec<MitoRegionRef>,
112        dst_region_to_src_regions: Vec<(MitoRegionRef, HashSet<RegionId>)>,
113    ) -> Result<FileRefsManifest> {
114        let mut ref_files = HashMap::new();
115        // get from in memory file handles
116        for region_id in query_regions_for_mem.iter().map(|r| r.region_id()) {
117            if let Some(files) = self.ref_file_set(region_id) {
118                ref_files.insert(region_id, files);
119            }
120        }
121
122        let mut manifest_version = HashMap::new();
123
124        let mut cross_region_refs = HashMap::new();
125
126        // get file refs from related regions' manifests
127        for (dst_region, src_regions) in &dst_region_to_src_regions {
128            let manifest = dst_region.manifest_ctx.manifest().await;
129            for meta in manifest.files.values() {
130                if src_regions.contains(&meta.region_id) {
131                    cross_region_refs
132                        .entry(meta.region_id)
133                        .or_insert_with(HashSet::new)
134                        .insert(dst_region.region_id());
135                    // since gc couldn't happen together with repartition
136                    // (both the queries and related_region acquire region read lock), no need to worry about
137                    // staging manifest in repartition here.
138                    ref_files
139                        .entry(meta.region_id)
140                        .or_insert_with(HashSet::new)
141                        .insert(FileRef::new(
142                            meta.region_id,
143                            meta.file_id,
144                            meta.index_version(),
145                        ));
146                }
147            }
148            // not sure if related region's manifest version is needed, but record it for now.
149            manifest_version.insert(dst_region.region_id(), manifest.manifest_version);
150        }
151
152        for r in &query_regions_for_mem {
153            let manifest = r.manifest_ctx.manifest().await;
154            // remove in manifest files for smaller size, since gc worker read from manifest later.
155            ref_files.entry(r.region_id()).and_modify(|refs| {
156                *refs = std::mem::take(refs)
157                    .into_iter()
158                    .filter(|f| !manifest.files.contains_key(&f.file_id))
159                    .collect();
160            });
161            manifest_version.insert(r.region_id(), manifest.manifest_version);
162        }
163
164        // simply return all ref files, no manifest version filtering for now.
165        Ok(FileRefsManifest {
166            file_refs: ref_files,
167            manifest_version,
168            cross_region_refs,
169        })
170    }
171
172    /// Adds a new file reference.
173    /// Also records the access layer for the table if not exists.
174    /// The access layer will be used to upload ref file to object storage.
175    pub fn add_file(&self, file_meta: &FileMeta) {
176        let region_id = file_meta.region_id;
177        let mut is_new = false;
178        {
179            let file_ref = FileRef::new(
180                file_meta.region_id,
181                file_meta.file_id,
182                file_meta.index_version(),
183            );
184            self.files_per_region
185                .entry(region_id)
186                .and_modify(|refs| {
187                    refs.files
188                        .entry(file_ref.clone())
189                        .and_modify(|count| *count += 1)
190                        .or_insert_with(|| {
191                            is_new = true;
192                            1
193                        });
194                })
195                .or_insert_with(|| RegionFileRefs {
196                    files: HashMap::from_iter([(file_ref, 1)]),
197                });
198        }
199        if is_new {
200            GC_REF_FILE_CNT.inc();
201        }
202    }
203
204    /// Removes a file reference.
205    /// If the reference count reaches zero, the file reference will be removed from the manager.
206    pub fn remove_file(&self, file_meta: &FileMeta) {
207        let region_id = file_meta.region_id;
208        let file_ref = FileRef::new(region_id, file_meta.file_id, file_meta.index_version());
209
210        let mut remove_table_entry = false;
211        let mut remove_file_ref = false;
212        let mut file_cnt = 0;
213
214        let region_ref = self.files_per_region.entry(region_id).and_modify(|refs| {
215            let entry = refs.files.entry(file_ref.clone()).and_modify(|count| {
216                if *count > 0 {
217                    *count -= 1;
218                }
219                if *count == 0 {
220                    remove_file_ref = true;
221                }
222            });
223            if let std::collections::hash_map::Entry::Occupied(o) = entry
224                && remove_file_ref
225            {
226                o.remove_entry();
227            }
228
229            file_cnt = refs.files.len();
230
231            if refs.files.is_empty() {
232                remove_table_entry = true;
233            }
234        });
235
236        if let Entry::Occupied(o) = region_ref
237            && remove_table_entry
238        {
239            o.remove_entry();
240        }
241        if remove_file_ref {
242            GC_REF_FILE_CNT.dec();
243        }
244    }
245
246    pub fn node_id(&self) -> Option<u64> {
247        self.node_id
248    }
249}
250
251#[cfg(test)]
252mod tests {
253    use std::num::NonZeroU64;
254
255    use smallvec::SmallVec;
256    use store_api::storage::{FileId, RegionId};
257
258    use super::*;
259    use crate::sst::file::{ColumnIndexMetadata, FileMeta, FileTimeRange, IndexType, RegionFileId};
260
261    #[tokio::test]
262    async fn test_file_ref_mgr() {
263        common_telemetry::init_default_ut_logging();
264
265        let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
266
267        let file_ref_mgr = FileReferenceManager::new(None);
268
269        let file_meta = FileMeta {
270            region_id: sst_file_id.region_id(),
271            file_id: sst_file_id.file_id(),
272            time_range: FileTimeRange::default(),
273            level: 0,
274            file_size: 4096,
275            max_row_group_uncompressed_size: 4096,
276            available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
277            indexes: vec![ColumnIndexMetadata {
278                column_id: 0,
279                created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
280            }],
281            index_file_size: 4096,
282            index_version: 0,
283            num_rows: 1024,
284            num_row_groups: 1,
285            sequence: NonZeroU64::new(4096),
286            partition_expr: None,
287            num_series: 0,
288        };
289
290        file_ref_mgr.add_file(&file_meta);
291
292        assert_eq!(
293            file_ref_mgr
294                .files_per_region
295                .get(&file_meta.region_id)
296                .unwrap()
297                .files,
298            HashMap::from_iter([(
299                FileRef::new(
300                    file_meta.region_id,
301                    file_meta.file_id,
302                    file_meta.index_version()
303                ),
304                1
305            )])
306        );
307
308        file_ref_mgr.add_file(&file_meta);
309
310        let expected_region_ref_manifest = HashSet::from_iter([FileRef::new(
311            file_meta.region_id,
312            file_meta.file_id,
313            file_meta.index_version(),
314        )]);
315
316        assert_eq!(
317            file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
318            expected_region_ref_manifest
319        );
320
321        assert_eq!(
322            file_ref_mgr
323                .files_per_region
324                .get(&file_meta.region_id)
325                .unwrap()
326                .files,
327            HashMap::from_iter([(
328                FileRef::new(
329                    file_meta.region_id,
330                    file_meta.file_id,
331                    file_meta.index_version()
332                ),
333                2
334            )])
335        );
336
337        assert_eq!(
338            file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
339            expected_region_ref_manifest
340        );
341
342        file_ref_mgr.remove_file(&file_meta);
343
344        assert_eq!(
345            file_ref_mgr
346                .files_per_region
347                .get(&file_meta.region_id)
348                .unwrap()
349                .files,
350            HashMap::from_iter([(
351                FileRef::new(
352                    file_meta.region_id,
353                    file_meta.file_id,
354                    file_meta.index_version()
355                ),
356                1
357            )])
358        );
359
360        assert_eq!(
361            file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
362            expected_region_ref_manifest
363        );
364
365        file_ref_mgr.remove_file(&file_meta);
366
367        assert!(
368            file_ref_mgr
369                .files_per_region
370                .get(&file_meta.region_id)
371                .is_none(),
372            "{:?}",
373            file_ref_mgr.files_per_region
374        );
375
376        assert!(
377            file_ref_mgr.ref_file_set(file_meta.region_id).is_none(),
378            "{:?}",
379            file_ref_mgr.files_per_region
380        );
381    }
382}