mito2/sst/
file_ref.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_telemetry::debug;
19use dashmap::{DashMap, Entry};
20use serde::{Deserialize, Serialize};
21use store_api::ManifestVersion;
22use store_api::storage::{FileId, RegionId, TableId};
23
24use crate::error::Result;
25use crate::metrics::GC_REF_FILE_CNT;
26use crate::region::RegionMapRef;
27use crate::sst::file::FileMeta;
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
30pub struct FileRef {
31    pub region_id: RegionId,
32    pub file_id: FileId,
33}
34
35impl FileRef {
36    pub fn new(region_id: RegionId, file_id: FileId) -> Self {
37        Self { region_id, file_id }
38    }
39}
40
41/// File references for a table.
42/// It contains all files referenced by the table.
43#[derive(Debug, Clone, Default)]
44pub struct TableFileRefs {
45    /// (FileRef, Ref Count) meaning how many FileHandleInner is opened for this file.
46    pub files: HashMap<FileRef, usize>,
47}
48
49/// Manages all file references in one datanode.
50/// It keeps track of which files are referenced and group by table ids.
51/// And periodically update the references to tmp file in object storage.
52/// This is useful for ensuring that files are not deleted while they are still in use by any
53/// query.
54#[derive(Debug)]
55pub struct FileReferenceManager {
56    /// Datanode id. used to determine tmp ref file name.
57    node_id: Option<u64>,
58    /// TODO(discord9): use no hash hasher since table id is sequential.
59    files_per_table: DashMap<TableId, TableFileRefs>,
60}
61
62pub type FileReferenceManagerRef = Arc<FileReferenceManager>;
63
64/// The tmp file uploaded to object storage to record one table's file references.
65#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
66pub struct TableFileRefsManifest {
67    pub file_refs: HashSet<FileRef>,
68    /// Manifest version when this manifest is read for it's files
69    pub manifest_version: HashMap<RegionId, ManifestVersion>,
70}
71
72impl FileReferenceManager {
73    pub fn new(node_id: Option<u64>) -> Self {
74        Self {
75            node_id,
76            files_per_table: Default::default(),
77        }
78    }
79
80    fn ref_file_set(&self, table_id: TableId) -> Option<HashSet<FileRef>> {
81        let file_refs = if let Some(file_refs) = self.files_per_table.get(&table_id) {
82            file_refs.clone()
83        } else {
84            // still return an empty manifest to indicate no files are referenced.
85            // and differentiate from error case where table_id not found.
86            return None;
87        };
88
89        if file_refs.files.is_empty() {
90            // still return an empty manifest to indicate no files are referenced.
91            // and differentiate from error case where table_id not found.
92            return Some(HashSet::new());
93        }
94
95        let ref_file_set: HashSet<FileRef> = file_refs.files.keys().cloned().collect();
96
97        debug!(
98            "Get file refs for table {}, node {:?}, {} files",
99            table_id,
100            self.node_id,
101            ref_file_set.len(),
102        );
103
104        Some(ref_file_set)
105    }
106
107    /// Gets all ref files for the given table id, excluding those already in region manifest.
108    ///
109    /// It's safe if manifest version became outdated when gc worker is called, as gc worker will check the changes between those two versions and act accordingly to make sure to get the real truly tmp ref file sets at the time of old manifest version.
110    ///
111    /// TODO(discord9): Since query will only possible refer to files in latest manifest when it's started, the only true risks is files removed from manifest between old version(when reading refs) and new version(at gc worker), so in case of having outdated manifest version, gc worker should make sure not to delete those files(Until next gc round which will use the latest manifest version and handle those files normally).
112    /// or perhaps using a two-phase commit style process where it proposes a set of files for deletion and then verifies no new references have appeared before committing the delete.
113    ///
114    /// gc worker could do this:
115    /// 1. if can get the files that got removed from old manifest to new manifest, then shouldn't delete those files even if they are not in tmp ref file, other files can be normally handled(deleted if not in use, otherwise keep)
116    ///    and report back allow next gc round to handle those files with newer tmp ref file sets.
117    /// 2. if can't get the files that got removed from old manifest to new manifest(possible if just did a checkpoint),
118    ///    then can do nothing as can't sure whether a file is truly unused or just tmp ref file sets haven't report it, so need to report back and try next gc round to handle those files with newer tmp ref file sets.
119    ///
120    #[allow(unused)]
121    pub(crate) async fn get_snapshot_of_unmanifested_refs(
122        &self,
123        table_id: TableId,
124        region_map: &RegionMapRef,
125    ) -> Result<TableFileRefsManifest> {
126        let Some(ref_files) = self.ref_file_set(table_id) else {
127            return Ok(Default::default());
128        };
129        let region_list = region_map.list_regions();
130        let table_regions = region_list
131            .iter()
132            .filter(|r| r.region_id().table_id() == table_id)
133            .collect::<Vec<_>>();
134
135        let mut in_manifest_files = HashSet::new();
136        let mut manifest_version = HashMap::new();
137
138        for r in &table_regions {
139            let manifest = r.manifest_ctx.manifest().await;
140            let files = manifest.files.keys().cloned().collect::<Vec<_>>();
141            in_manifest_files.extend(files);
142            manifest_version.insert(r.region_id(), manifest.manifest_version);
143        }
144
145        let ref_files_excluding_in_manifest = ref_files
146            .iter()
147            .filter(|f| !in_manifest_files.contains(&f.file_id))
148            .cloned()
149            .collect::<HashSet<_>>();
150
151        Ok(TableFileRefsManifest {
152            file_refs: ref_files_excluding_in_manifest,
153            manifest_version,
154        })
155    }
156
157    /// Adds a new file reference.
158    /// Also records the access layer for the table if not exists.
159    /// The access layer will be used to upload ref file to object storage.
160    pub fn add_file(&self, file_meta: &FileMeta) {
161        let table_id = file_meta.region_id.table_id();
162        let mut is_new = false;
163        {
164            let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
165            self.files_per_table
166                .entry(table_id)
167                .and_modify(|refs| {
168                    refs.files
169                        .entry(file_ref.clone())
170                        .and_modify(|count| *count += 1)
171                        .or_insert_with(|| {
172                            is_new = true;
173                            1
174                        });
175                })
176                .or_insert_with(|| TableFileRefs {
177                    files: HashMap::from_iter([(file_ref, 1)]),
178                });
179        }
180        if is_new {
181            GC_REF_FILE_CNT.inc();
182        }
183    }
184
185    /// Removes a file reference.
186    /// If the reference count reaches zero, the file reference will be removed from the manager.
187    pub fn remove_file(&self, file_meta: &FileMeta) {
188        let table_id = file_meta.region_id.table_id();
189        let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
190
191        let mut remove_table_entry = false;
192        let mut remove_file_ref = false;
193        let mut file_cnt = 0;
194
195        let table_ref = self.files_per_table.entry(table_id).and_modify(|refs| {
196            let entry = refs.files.entry(file_ref.clone()).and_modify(|count| {
197                if *count > 0 {
198                    *count -= 1;
199                }
200                if *count == 0 {
201                    remove_file_ref = true;
202                }
203            });
204            if let std::collections::hash_map::Entry::Occupied(o) = entry
205                && remove_file_ref
206            {
207                o.remove_entry();
208            }
209
210            file_cnt = refs.files.len();
211
212            if refs.files.is_empty() {
213                remove_table_entry = true;
214            }
215        });
216
217        if let Entry::Occupied(o) = table_ref
218            && remove_table_entry
219        {
220            o.remove_entry();
221        }
222        if remove_file_ref {
223            GC_REF_FILE_CNT.dec();
224        }
225    }
226
227    pub fn node_id(&self) -> Option<u64> {
228        self.node_id
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use std::num::NonZeroU64;
235
236    use smallvec::SmallVec;
237    use store_api::storage::RegionId;
238
239    use super::*;
240    use crate::sst::file::{FileMeta, FileTimeRange, IndexType, RegionFileId};
241
242    #[tokio::test]
243    async fn test_file_ref_mgr() {
244        common_telemetry::init_default_ut_logging();
245
246        let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
247
248        let file_ref_mgr = FileReferenceManager::new(None);
249
250        let file_meta = FileMeta {
251            region_id: sst_file_id.region_id(),
252            file_id: sst_file_id.file_id(),
253            time_range: FileTimeRange::default(),
254            level: 0,
255            file_size: 4096,
256            available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
257            index_file_size: 4096,
258            num_rows: 1024,
259            num_row_groups: 1,
260            sequence: NonZeroU64::new(4096),
261            partition_expr: None,
262        };
263
264        file_ref_mgr.add_file(&file_meta);
265
266        assert_eq!(
267            file_ref_mgr.files_per_table.get(&0).unwrap().files,
268            HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
269        );
270
271        file_ref_mgr.add_file(&file_meta);
272
273        let expected_table_ref_manifest =
274            HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id)]);
275
276        assert_eq!(
277            file_ref_mgr.ref_file_set(0).unwrap(),
278            expected_table_ref_manifest
279        );
280
281        assert_eq!(
282            file_ref_mgr.files_per_table.get(&0).unwrap().files,
283            HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 2)])
284        );
285
286        assert_eq!(
287            file_ref_mgr.ref_file_set(0).unwrap(),
288            expected_table_ref_manifest
289        );
290
291        file_ref_mgr.remove_file(&file_meta);
292
293        assert_eq!(
294            file_ref_mgr.files_per_table.get(&0).unwrap().files,
295            HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
296        );
297
298        assert_eq!(
299            file_ref_mgr.ref_file_set(0).unwrap(),
300            expected_table_ref_manifest
301        );
302
303        file_ref_mgr.remove_file(&file_meta);
304
305        assert!(
306            file_ref_mgr.files_per_table.get(&0).is_none(),
307            "{:?}",
308            file_ref_mgr.files_per_table
309        );
310
311        assert!(
312            file_ref_mgr.ref_file_set(0).is_none(),
313            "{:?}",
314            file_ref_mgr.files_per_table
315        );
316    }
317}