mito2/sst/
file_ref.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_telemetry::debug;
19use dashmap::{DashMap, Entry};
20use store_api::storage::{FileRef, FileRefsManifest, RegionId};
21
22use crate::error::Result;
23use crate::metrics::GC_REF_FILE_CNT;
24use crate::region::MitoRegionRef;
25use crate::sst::file::FileMeta;
26
27/// File references for a region.
28/// It contains all files referenced by the region.
29#[derive(Debug, Clone, Default)]
30pub struct RegionFileRefs {
31    /// (FileRef, Ref Count) meaning how many FileHandleInner is opened for this file.
32    pub files: HashMap<FileRef, usize>,
33}
34
35/// Manages all file references in one datanode.
36/// It keeps track of which files are referenced and group by table ids.
37/// This is useful for ensuring that files are not deleted while they are still in use by any
38/// query.
39#[derive(Debug)]
40pub struct FileReferenceManager {
41    /// Datanode id. used to determine tmp ref file name.
42    node_id: Option<u64>,
43    /// TODO(discord9): use no hash hasher since table id is sequential.
44    files_per_region: DashMap<RegionId, RegionFileRefs>,
45}
46
47pub type FileReferenceManagerRef = Arc<FileReferenceManager>;
48
49impl FileReferenceManager {
50    pub fn new(node_id: Option<u64>) -> Self {
51        Self {
52            node_id,
53            files_per_region: Default::default(),
54        }
55    }
56
57    fn ref_file_set(&self, region_id: RegionId) -> Option<HashSet<FileRef>> {
58        let file_refs = if let Some(file_refs) = self.files_per_region.get(&region_id) {
59            file_refs.clone()
60        } else {
61            // region id not found.
62            return None;
63        };
64
65        if file_refs.files.is_empty() {
66            // still return an empty manifest to indicate no files are referenced.
67            // and differentiate from error case where table_id not found.
68            return Some(HashSet::new());
69        }
70
71        let ref_file_set: HashSet<FileRef> = file_refs.files.keys().cloned().collect();
72
73        debug!(
74            "Get file refs for region {}, node {:?}, {} files",
75            region_id,
76            self.node_id,
77            ref_file_set.len(),
78        );
79
80        Some(ref_file_set)
81    }
82
83    /// Gets all ref files for the given table id, excluding those already in region manifest.
84    ///
85    /// It's safe if manifest version became outdated when gc worker is called, as gc worker will check the changes between those two versions and act accordingly to make sure to get the real truly tmp ref file sets at the time of old manifest version.
86    ///
87    /// TODO(discord9): Since query will only possible refer to files in latest manifest when it's started, the only true risks is files removed from manifest between old version(when reading refs) and new version(at gc worker), so in case of having outdated manifest version, gc worker should make sure not to delete those files(Until next gc round which will use the latest manifest version and handle those files normally).
88    /// or perhaps using a two-phase commit style process where it proposes a set of files for deletion and then verifies no new references have appeared before committing the delete.
89    ///
90    /// gc worker could do this:
91    /// 1. if can get the files that got removed from old manifest to new manifest, then shouldn't delete those files even if they are not in tmp ref file, other files can be normally handled(deleted if not in use, otherwise keep)
92    ///    and report back allow next gc round to handle those files with newer tmp ref file sets.
93    /// 2. if can't get the files that got removed from old manifest to new manifest(possible if just did a checkpoint),
94    ///    then can do nothing as can't sure whether a file is truly unused or just tmp ref file sets haven't report it, so need to report back and try next gc round to handle those files with newer tmp ref file sets.
95    ///
96    #[allow(unused)]
97    pub(crate) async fn get_snapshot_of_unmanifested_refs(
98        &self,
99        regions: Vec<MitoRegionRef>,
100    ) -> Result<FileRefsManifest> {
101        let mut ref_files = HashMap::new();
102        for region_id in regions.iter().map(|r| r.region_id()) {
103            if let Some(files) = self.ref_file_set(region_id) {
104                ref_files.insert(region_id, files);
105            }
106        }
107
108        let mut in_manifest_files = HashSet::new();
109        let mut manifest_version = HashMap::new();
110
111        for r in &regions {
112            let manifest = r.manifest_ctx.manifest().await;
113            let files = manifest.files.keys().cloned().collect::<Vec<_>>();
114            in_manifest_files.extend(files);
115            manifest_version.insert(r.region_id(), manifest.manifest_version);
116        }
117
118        let ref_files_excluding_in_manifest = ref_files
119            .iter()
120            .map(|(r, f)| {
121                (
122                    *r,
123                    f.iter()
124                        .filter_map(|f| {
125                            (!in_manifest_files.contains(&f.file_id)).then_some(f.file_id)
126                        })
127                        .collect::<HashSet<_>>(),
128                )
129            })
130            .collect();
131        Ok(FileRefsManifest {
132            file_refs: ref_files_excluding_in_manifest,
133            manifest_version,
134        })
135    }
136
137    /// Adds a new file reference.
138    /// Also records the access layer for the table if not exists.
139    /// The access layer will be used to upload ref file to object storage.
140    pub fn add_file(&self, file_meta: &FileMeta) {
141        let region_id = file_meta.region_id;
142        let mut is_new = false;
143        {
144            let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
145            self.files_per_region
146                .entry(region_id)
147                .and_modify(|refs| {
148                    refs.files
149                        .entry(file_ref.clone())
150                        .and_modify(|count| *count += 1)
151                        .or_insert_with(|| {
152                            is_new = true;
153                            1
154                        });
155                })
156                .or_insert_with(|| RegionFileRefs {
157                    files: HashMap::from_iter([(file_ref, 1)]),
158                });
159        }
160        if is_new {
161            GC_REF_FILE_CNT.inc();
162        }
163    }
164
165    /// Removes a file reference.
166    /// If the reference count reaches zero, the file reference will be removed from the manager.
167    pub fn remove_file(&self, file_meta: &FileMeta) {
168        let region_id = file_meta.region_id;
169        let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
170
171        let mut remove_table_entry = false;
172        let mut remove_file_ref = false;
173        let mut file_cnt = 0;
174
175        let region_ref = self.files_per_region.entry(region_id).and_modify(|refs| {
176            let entry = refs.files.entry(file_ref.clone()).and_modify(|count| {
177                if *count > 0 {
178                    *count -= 1;
179                }
180                if *count == 0 {
181                    remove_file_ref = true;
182                }
183            });
184            if let std::collections::hash_map::Entry::Occupied(o) = entry
185                && remove_file_ref
186            {
187                o.remove_entry();
188            }
189
190            file_cnt = refs.files.len();
191
192            if refs.files.is_empty() {
193                remove_table_entry = true;
194            }
195        });
196
197        if let Entry::Occupied(o) = region_ref
198            && remove_table_entry
199        {
200            o.remove_entry();
201        }
202        if remove_file_ref {
203            GC_REF_FILE_CNT.dec();
204        }
205    }
206
207    pub fn node_id(&self) -> Option<u64> {
208        self.node_id
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use std::num::NonZeroU64;
215
216    use smallvec::SmallVec;
217    use store_api::storage::{FileId, RegionId};
218
219    use super::*;
220    use crate::sst::file::{FileMeta, FileTimeRange, IndexType, RegionFileId};
221
222    #[tokio::test]
223    async fn test_file_ref_mgr() {
224        common_telemetry::init_default_ut_logging();
225
226        let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
227
228        let file_ref_mgr = FileReferenceManager::new(None);
229
230        let file_meta = FileMeta {
231            region_id: sst_file_id.region_id(),
232            file_id: sst_file_id.file_id(),
233            time_range: FileTimeRange::default(),
234            level: 0,
235            file_size: 4096,
236            available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
237            index_file_size: 4096,
238            index_file_id: None,
239            num_rows: 1024,
240            num_row_groups: 1,
241            sequence: NonZeroU64::new(4096),
242            partition_expr: None,
243            num_series: 0,
244        };
245
246        file_ref_mgr.add_file(&file_meta);
247
248        assert_eq!(
249            file_ref_mgr
250                .files_per_region
251                .get(&file_meta.region_id)
252                .unwrap()
253                .files,
254            HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
255        );
256
257        file_ref_mgr.add_file(&file_meta);
258
259        let expected_region_ref_manifest =
260            HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id)]);
261
262        assert_eq!(
263            file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
264            expected_region_ref_manifest
265        );
266
267        assert_eq!(
268            file_ref_mgr
269                .files_per_region
270                .get(&file_meta.region_id)
271                .unwrap()
272                .files,
273            HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 2)])
274        );
275
276        assert_eq!(
277            file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
278            expected_region_ref_manifest
279        );
280
281        file_ref_mgr.remove_file(&file_meta);
282
283        assert_eq!(
284            file_ref_mgr
285                .files_per_region
286                .get(&file_meta.region_id)
287                .unwrap()
288                .files,
289            HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
290        );
291
292        assert_eq!(
293            file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
294            expected_region_ref_manifest
295        );
296
297        file_ref_mgr.remove_file(&file_meta);
298
299        assert!(
300            file_ref_mgr
301                .files_per_region
302                .get(&file_meta.region_id)
303                .is_none(),
304            "{:?}",
305            file_ref_mgr.files_per_region
306        );
307
308        assert!(
309            file_ref_mgr.ref_file_set(file_meta.region_id).is_none(),
310            "{:?}",
311            file_ref_mgr.files_per_region
312        );
313    }
314}