mito2/sst/
file_ref.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_telemetry::debug;
19use dashmap::{DashMap, Entry};
20use store_api::storage::{FileRef, FileRefsManifest, RegionId};
21
22use crate::error::Result;
23use crate::metrics::GC_REF_FILE_CNT;
24use crate::region::MitoRegionRef;
25use crate::sst::file::FileMeta;
26
27/// File references for a region.
28/// It contains all files referenced by the region.
29#[derive(Debug, Clone, Default)]
30pub struct RegionFileRefs {
31    /// (FileRef, Ref Count) meaning how many FileHandleInner is opened for this file.
32    pub files: HashMap<FileRef, usize>,
33}
34
35/// Manages all file references in one datanode.
36/// It keeps track of which files are referenced and group by table ids.
37/// This is useful for ensuring that files are not deleted while they are still in use by any
38/// query.
39#[derive(Debug)]
40pub struct FileReferenceManager {
41    /// Datanode id. used to determine tmp ref file name.
42    node_id: Option<u64>,
43    /// TODO(discord9): use no hash hasher since table id is sequential.
44    files_per_region: DashMap<RegionId, RegionFileRefs>,
45}
46
47pub type FileReferenceManagerRef = Arc<FileReferenceManager>;
48
49impl FileReferenceManager {
50    pub fn new(node_id: Option<u64>) -> Self {
51        Self {
52            node_id,
53            files_per_region: Default::default(),
54        }
55    }
56
57    fn ref_file_set(&self, region_id: RegionId) -> Option<HashSet<FileRef>> {
58        let file_refs = if let Some(file_refs) = self.files_per_region.get(&region_id) {
59            file_refs.clone()
60        } else {
61            // region id not found.
62            return None;
63        };
64
65        if file_refs.files.is_empty() {
66            // still return an empty manifest to indicate no files are referenced.
67            // and differentiate from error case where table_id not found.
68            return Some(HashSet::new());
69        }
70
71        let ref_file_set: HashSet<FileRef> = file_refs.files.keys().cloned().collect();
72
73        debug!(
74            "Get file refs for region {}, node {:?}, {} files",
75            region_id,
76            self.node_id,
77            ref_file_set.len(),
78        );
79
80        Some(ref_file_set)
81    }
82
83    /// Gets all ref files for the given regions, meaning all open FileHandles for those regions
84    /// and from related regions' manifests.
85    pub(crate) async fn get_snapshot_of_file_refs(
86        &self,
87        query_regions: Vec<MitoRegionRef>,
88        related_regions: Vec<(MitoRegionRef, Vec<RegionId>)>,
89    ) -> Result<FileRefsManifest> {
90        let mut ref_files = HashMap::new();
91        // get from in memory file handles
92        for region_id in query_regions.iter().map(|r| r.region_id()) {
93            if let Some(files) = self.ref_file_set(region_id) {
94                ref_files.insert(region_id, files.into_iter().map(|f| f.file_id).collect());
95            }
96        }
97
98        let mut manifest_version = HashMap::new();
99
100        for r in &query_regions {
101            let manifest = r.manifest_ctx.manifest().await;
102            manifest_version.insert(r.region_id(), manifest.manifest_version);
103        }
104
105        // get file refs from related regions' manifests
106        for (related_region, queries) in &related_regions {
107            let queries = queries.iter().cloned().collect::<HashSet<_>>();
108            let manifest = related_region.manifest_ctx.manifest().await;
109            for meta in manifest.files.values() {
110                if queries.contains(&meta.region_id) {
111                    ref_files
112                        .entry(meta.region_id)
113                        .or_insert_with(HashSet::new)
114                        .insert(meta.file_id);
115                }
116            }
117            // not sure if related region's manifest version is needed, but record it for now.
118            manifest_version.insert(related_region.region_id(), manifest.manifest_version);
119        }
120
121        // simply return all ref files, no manifest version filtering for now.
122        Ok(FileRefsManifest {
123            file_refs: ref_files,
124            manifest_version,
125        })
126    }
127
128    /// Adds a new file reference.
129    /// Also records the access layer for the table if not exists.
130    /// The access layer will be used to upload ref file to object storage.
131    pub fn add_file(&self, file_meta: &FileMeta) {
132        let region_id = file_meta.region_id;
133        let mut is_new = false;
134        {
135            let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
136            self.files_per_region
137                .entry(region_id)
138                .and_modify(|refs| {
139                    refs.files
140                        .entry(file_ref.clone())
141                        .and_modify(|count| *count += 1)
142                        .or_insert_with(|| {
143                            is_new = true;
144                            1
145                        });
146                })
147                .or_insert_with(|| RegionFileRefs {
148                    files: HashMap::from_iter([(file_ref, 1)]),
149                });
150        }
151        if is_new {
152            GC_REF_FILE_CNT.inc();
153        }
154    }
155
156    /// Removes a file reference.
157    /// If the reference count reaches zero, the file reference will be removed from the manager.
158    pub fn remove_file(&self, file_meta: &FileMeta) {
159        let region_id = file_meta.region_id;
160        let file_ref = FileRef::new(region_id, file_meta.file_id);
161
162        let mut remove_table_entry = false;
163        let mut remove_file_ref = false;
164        let mut file_cnt = 0;
165
166        let region_ref = self.files_per_region.entry(region_id).and_modify(|refs| {
167            let entry = refs.files.entry(file_ref.clone()).and_modify(|count| {
168                if *count > 0 {
169                    *count -= 1;
170                }
171                if *count == 0 {
172                    remove_file_ref = true;
173                }
174            });
175            if let std::collections::hash_map::Entry::Occupied(o) = entry
176                && remove_file_ref
177            {
178                o.remove_entry();
179            }
180
181            file_cnt = refs.files.len();
182
183            if refs.files.is_empty() {
184                remove_table_entry = true;
185            }
186        });
187
188        if let Entry::Occupied(o) = region_ref
189            && remove_table_entry
190        {
191            o.remove_entry();
192        }
193        if remove_file_ref {
194            GC_REF_FILE_CNT.dec();
195        }
196    }
197
198    pub fn node_id(&self) -> Option<u64> {
199        self.node_id
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use std::num::NonZeroU64;
206
207    use smallvec::SmallVec;
208    use store_api::storage::{FileId, RegionId};
209
210    use super::*;
211    use crate::sst::file::{FileMeta, FileTimeRange, IndexType, RegionFileId};
212
213    #[tokio::test]
214    async fn test_file_ref_mgr() {
215        common_telemetry::init_default_ut_logging();
216
217        let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
218
219        let file_ref_mgr = FileReferenceManager::new(None);
220
221        let file_meta = FileMeta {
222            region_id: sst_file_id.region_id(),
223            file_id: sst_file_id.file_id(),
224            time_range: FileTimeRange::default(),
225            level: 0,
226            file_size: 4096,
227            available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
228            index_file_size: 4096,
229            index_file_id: None,
230            num_rows: 1024,
231            num_row_groups: 1,
232            sequence: NonZeroU64::new(4096),
233            partition_expr: None,
234            num_series: 0,
235        };
236
237        file_ref_mgr.add_file(&file_meta);
238
239        assert_eq!(
240            file_ref_mgr
241                .files_per_region
242                .get(&file_meta.region_id)
243                .unwrap()
244                .files,
245            HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
246        );
247
248        file_ref_mgr.add_file(&file_meta);
249
250        let expected_region_ref_manifest =
251            HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id)]);
252
253        assert_eq!(
254            file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
255            expected_region_ref_manifest
256        );
257
258        assert_eq!(
259            file_ref_mgr
260                .files_per_region
261                .get(&file_meta.region_id)
262                .unwrap()
263                .files,
264            HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 2)])
265        );
266
267        assert_eq!(
268            file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
269            expected_region_ref_manifest
270        );
271
272        file_ref_mgr.remove_file(&file_meta);
273
274        assert_eq!(
275            file_ref_mgr
276                .files_per_region
277                .get(&file_meta.region_id)
278                .unwrap()
279                .files,
280            HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
281        );
282
283        assert_eq!(
284            file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
285            expected_region_ref_manifest
286        );
287
288        file_ref_mgr.remove_file(&file_meta);
289
290        assert!(
291            file_ref_mgr
292                .files_per_region
293                .get(&file_meta.region_id)
294                .is_none(),
295            "{:?}",
296            file_ref_mgr.files_per_region
297        );
298
299        assert!(
300            file_ref_mgr.ref_file_set(file_meta.region_id).is_none(),
301            "{:?}",
302            file_ref_mgr.files_per_region
303        );
304    }
305}