mito2/sst/
file_ref.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_telemetry::debug;
19use dashmap::{DashMap, Entry};
20use serde::{Deserialize, Serialize};
21use store_api::storage::{RegionId, TableId};
22use store_api::ManifestVersion;
23
24use crate::error::Result;
25use crate::metrics::GC_REF_FILE_CNT;
26use crate::region::RegionMapRef;
27use crate::sst::file::{FileId, FileMeta};
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
30pub struct FileRef {
31    pub region_id: RegionId,
32    pub file_id: FileId,
33}
34
35impl FileRef {
36    pub fn new(region_id: RegionId, file_id: FileId) -> Self {
37        Self { region_id, file_id }
38    }
39}
40
41/// File references for a table.
42/// It contains all files referenced by the table.
43#[derive(Debug, Clone, Default)]
44pub struct TableFileRefs {
45    /// (FileRef, Ref Count) meaning how many FileHandleInner is opened for this file.
46    pub files: HashMap<FileRef, usize>,
47}
48
49/// Manages all file references in one datanode.
50/// It keeps track of which files are referenced and group by table ids.
51/// And periodically update the references to tmp file in object storage.
52/// This is useful for ensuring that files are not deleted while they are still in use by any
53/// query.
54#[derive(Debug)]
55pub struct FileReferenceManager {
56    /// Datanode id. used to determine tmp ref file name.
57    node_id: Option<u64>,
58    /// TODO(discord9): use no hash hasher since table id is sequential.
59    files_per_table: DashMap<TableId, TableFileRefs>,
60}
61
62pub type FileReferenceManagerRef = Arc<FileReferenceManager>;
63
64/// The tmp file uploaded to object storage to record one table's file references.
65#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
66pub struct TableFileRefsManifest {
67    pub file_refs: HashSet<FileRef>,
68    /// Manifest version when this manifest is read for it's files
69    pub manifest_version: HashMap<RegionId, ManifestVersion>,
70}
71
72impl FileReferenceManager {
73    pub fn new(node_id: Option<u64>) -> Self {
74        Self {
75            node_id,
76            files_per_table: Default::default(),
77        }
78    }
79
80    fn ref_file_set(&self, table_id: TableId) -> Option<HashSet<FileRef>> {
81        let file_refs = if let Some(file_refs) = self.files_per_table.get(&table_id) {
82            file_refs.clone()
83        } else {
84            // still return an empty manifest to indicate no files are referenced.
85            // and differentiate from error case where table_id not found.
86            return None;
87        };
88
89        if file_refs.files.is_empty() {
90            // still return an empty manifest to indicate no files are referenced.
91            // and differentiate from error case where table_id not found.
92            return Some(HashSet::new());
93        }
94
95        let ref_file_set: HashSet<FileRef> = file_refs.files.keys().cloned().collect();
96
97        debug!(
98            "Get file refs for table {}, node {:?}, {} files",
99            table_id,
100            self.node_id,
101            ref_file_set.len(),
102        );
103
104        Some(ref_file_set)
105    }
106
107    /// Gets all ref files for the given table id, excluding those already in region manifest.
108    ///
109    /// It's safe if manifest version became outdated when gc worker is called, as gc worker will check the changes between those two versions and act accordingly to make sure to get the real truly tmp ref file sets at the time of old manifest version.
110    ///
111    /// TODO(discord9): Since query will only possible refer to files in latest manifest when it's started, the only true risks is files removed from manifest between old version(when reading refs) and new version(at gc worker), so in case of having outdated manifest version, gc worker should make sure not to delete those files(Until next gc round which will use the latest manifest version and handle those files normally).
112    /// or perhaps using a two-phase commit style process where it proposes a set of files for deletion and then verifies no new references have appeared before committing the delete.
113    ///
114    /// gc worker could do this:
115    /// 1. if can get the files that got removed from old manifest to new manifest, then shouldn't delete those files even if they are not in tmp ref file, other files can be normally handled(deleted if not in use, otherwise keep)
116    ///    and report back allow next gc round to handle those files with newer tmp ref file sets.
117    /// 2. if can't get the files that got removed from old manifest to new manifest(possible if just did a checkpoint),
118    ///    then can do nothing as can't sure whether a file is truly unused or just tmp ref file sets haven't report it, so need to report back and try next gc round to handle those files with newer tmp ref file sets.
119    ///
120    #[allow(unused)]
121    pub(crate) async fn get_snapshot_of_unmanifested_refs(
122        &self,
123        table_id: TableId,
124        region_map: &RegionMapRef,
125    ) -> Result<TableFileRefsManifest> {
126        let Some(ref_files) = self.ref_file_set(table_id) else {
127            return Ok(Default::default());
128        };
129        let region_list = region_map.list_regions();
130        let table_regions = region_list
131            .iter()
132            .filter(|r| r.region_id().table_id() == table_id)
133            .collect::<Vec<_>>();
134
135        let mut in_manifest_files = HashSet::new();
136        let mut manifest_version = HashMap::new();
137
138        for r in &table_regions {
139            let manifest = r.manifest_ctx.manifest().await;
140            let files = manifest.files.keys().cloned().collect::<Vec<_>>();
141            in_manifest_files.extend(files);
142            manifest_version.insert(r.region_id(), manifest.manifest_version);
143        }
144
145        let ref_files_excluding_in_manifest = ref_files
146            .iter()
147            .filter(|f| !in_manifest_files.contains(&f.file_id))
148            .cloned()
149            .collect::<HashSet<_>>();
150
151        Ok(TableFileRefsManifest {
152            file_refs: ref_files_excluding_in_manifest,
153            manifest_version,
154        })
155    }
156
157    /// Adds a new file reference.
158    /// Also records the access layer for the table if not exists.
159    /// The access layer will be used to upload ref file to object storage.
160    pub fn add_file(&self, file_meta: &FileMeta) {
161        let table_id = file_meta.region_id.table_id();
162        let mut is_new = false;
163        {
164            let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
165            self.files_per_table
166                .entry(table_id)
167                .and_modify(|refs| {
168                    refs.files
169                        .entry(file_ref.clone())
170                        .and_modify(|count| *count += 1)
171                        .or_insert_with(|| {
172                            is_new = true;
173                            1
174                        });
175                })
176                .or_insert_with(|| TableFileRefs {
177                    files: HashMap::from_iter([(file_ref, 1)]),
178                });
179        }
180        if is_new {
181            GC_REF_FILE_CNT.inc();
182        }
183    }
184
185    /// Removes a file reference.
186    /// If the reference count reaches zero, the file reference will be removed from the manager.
187    pub fn remove_file(&self, file_meta: &FileMeta) {
188        let table_id = file_meta.region_id.table_id();
189        let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
190
191        let mut remove_table_entry = false;
192        let mut remove_file_ref = false;
193        let mut file_cnt = 0;
194
195        let table_ref = self.files_per_table.entry(table_id).and_modify(|refs| {
196            let entry = refs.files.entry(file_ref.clone()).and_modify(|count| {
197                if *count > 0 {
198                    *count -= 1;
199                }
200                if *count == 0 {
201                    remove_file_ref = true;
202                }
203            });
204            if let std::collections::hash_map::Entry::Occupied(o) = entry
205                && remove_file_ref
206            {
207                o.remove_entry();
208            }
209
210            file_cnt = refs.files.len();
211
212            if refs.files.is_empty() {
213                remove_table_entry = true;
214            }
215        });
216
217        if let Entry::Occupied(o) = table_ref
218            && remove_table_entry
219        {
220            o.remove_entry();
221        }
222        if remove_file_ref {
223            GC_REF_FILE_CNT.dec();
224        }
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use std::num::NonZeroU64;
231
232    use smallvec::SmallVec;
233    use store_api::storage::RegionId;
234
235    use super::*;
236    use crate::sst::file::{FileId, FileMeta, FileTimeRange, IndexType, RegionFileId};
237
238    #[tokio::test]
239    async fn test_file_ref_mgr() {
240        common_telemetry::init_default_ut_logging();
241
242        let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
243
244        let file_ref_mgr = FileReferenceManager::new(None);
245
246        let file_meta = FileMeta {
247            region_id: sst_file_id.region_id(),
248            file_id: sst_file_id.file_id(),
249            time_range: FileTimeRange::default(),
250            level: 0,
251            file_size: 4096,
252            available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
253            index_file_size: 4096,
254            num_rows: 1024,
255            num_row_groups: 1,
256            sequence: NonZeroU64::new(4096),
257        };
258
259        file_ref_mgr.add_file(&file_meta);
260
261        assert_eq!(
262            file_ref_mgr.files_per_table.get(&0).unwrap().files,
263            HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
264        );
265
266        file_ref_mgr.add_file(&file_meta);
267
268        let expected_table_ref_manifest =
269            HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id)]);
270
271        assert_eq!(
272            file_ref_mgr.ref_file_set(0).unwrap(),
273            expected_table_ref_manifest
274        );
275
276        assert_eq!(
277            file_ref_mgr.files_per_table.get(&0).unwrap().files,
278            HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 2)])
279        );
280
281        assert_eq!(
282            file_ref_mgr.ref_file_set(0).unwrap(),
283            expected_table_ref_manifest
284        );
285
286        file_ref_mgr.remove_file(&file_meta);
287
288        assert_eq!(
289            file_ref_mgr.files_per_table.get(&0).unwrap().files,
290            HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
291        );
292
293        assert_eq!(
294            file_ref_mgr.ref_file_set(0).unwrap(),
295            expected_table_ref_manifest
296        );
297
298        file_ref_mgr.remove_file(&file_meta);
299
300        assert!(
301            file_ref_mgr.files_per_table.get(&0).is_none(),
302            "{:?}",
303            file_ref_mgr.files_per_table
304        );
305
306        assert!(
307            file_ref_mgr.ref_file_set(0).is_none(),
308            "{:?}",
309            file_ref_mgr.files_per_table
310        );
311    }
312}