mito2/sst/
file_ref.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_telemetry::debug;
19use dashmap::{DashMap, Entry};
20use store_api::storage::{FileRef, FileRefsManifest, RegionId};
21
22use crate::error::Result;
23use crate::metrics::GC_REF_FILE_CNT;
24use crate::region::MitoRegionRef;
25use crate::sst::file::FileMeta;
26
27/// File references for a region.
28/// It contains all files referenced by the region.
29#[derive(Debug, Clone, Default)]
30pub struct RegionFileRefs {
31    /// (FileRef, Ref Count) meaning how many FileHandleInner is opened for this file.
32    pub files: HashMap<FileRef, usize>,
33}
34
35/// Manages all file references in one datanode.
36/// It keeps track of which files are referenced and group by table ids.
37/// This is useful for ensuring that files are not deleted while they are still in use by any
38/// query.
39#[derive(Debug)]
40pub struct FileReferenceManager {
41    /// Datanode id. used to determine tmp ref file name.
42    node_id: Option<u64>,
43    /// TODO(discord9): use no hash hasher since table id is sequential.
44    files_per_region: DashMap<RegionId, RegionFileRefs>,
45}
46
47pub type FileReferenceManagerRef = Arc<FileReferenceManager>;
48
49impl FileReferenceManager {
50    pub fn new(node_id: Option<u64>) -> Self {
51        Self {
52            node_id,
53            files_per_region: Default::default(),
54        }
55    }
56
57    fn ref_file_set(&self, region_id: RegionId) -> Option<HashSet<FileRef>> {
58        let file_refs = if let Some(file_refs) = self.files_per_region.get(&region_id) {
59            file_refs.clone()
60        } else {
61            // region id not found.
62            return None;
63        };
64
65        if file_refs.files.is_empty() {
66            // still return an empty manifest to indicate no files are referenced.
67            // and differentiate from error case where table_id not found.
68            return Some(HashSet::new());
69        }
70
71        let ref_file_set: HashSet<FileRef> = file_refs.files.keys().cloned().collect();
72
73        debug!(
74            "Get file refs for region {}, node {:?}, {} files",
75            region_id,
76            self.node_id,
77            ref_file_set.len(),
78        );
79
80        Some(ref_file_set)
81    }
82
83    /// Gets all ref files for the given regions, meaning all open FileHandles for those regions
84    /// and from related regions' manifests.
85    /// `query_regions_for_mem` queries for in memory file handles.
86    /// `related_regions_in_manifest` queries for related regions' manifests to get more file refs of given region ids.
87    pub(crate) async fn get_snapshot_of_file_refs(
88        &self,
89        query_regions_for_mem: Vec<MitoRegionRef>,
90        related_regions_in_manifest: Vec<(MitoRegionRef, Vec<RegionId>)>,
91    ) -> Result<FileRefsManifest> {
92        let mut ref_files = HashMap::new();
93        // get from in memory file handles
94        for region_id in query_regions_for_mem.iter().map(|r| r.region_id()) {
95            if let Some(files) = self.ref_file_set(region_id) {
96                ref_files.insert(region_id, files);
97            }
98        }
99
100        let mut manifest_version = HashMap::new();
101
102        // get file refs from related regions' manifests
103        for (related_region, queries) in &related_regions_in_manifest {
104            let queries = queries.iter().cloned().collect::<HashSet<_>>();
105            let manifest = related_region.manifest_ctx.manifest().await;
106            for meta in manifest.files.values() {
107                if queries.contains(&meta.region_id) {
108                    // since gc couldn't happen together with repartition
109                    // (both the queries and related_region acquire region read lock), no need to worry about
110                    // staging manifest in repartition here.
111                    ref_files
112                        .entry(meta.region_id)
113                        .or_insert_with(HashSet::new)
114                        .insert(FileRef::new(
115                            meta.region_id,
116                            meta.file_id,
117                            meta.index_version(),
118                        ));
119                }
120            }
121            // not sure if related region's manifest version is needed, but record it for now.
122            manifest_version.insert(related_region.region_id(), manifest.manifest_version);
123        }
124
125        for r in &query_regions_for_mem {
126            let manifest = r.manifest_ctx.manifest().await;
127            // remove in manifest files for smaller size, since gc worker read from manifest later.
128            ref_files.entry(r.region_id()).and_modify(|refs| {
129                *refs = std::mem::take(refs)
130                    .into_iter()
131                    .filter(|f| !manifest.files.contains_key(&f.file_id))
132                    .collect();
133            });
134            manifest_version.insert(r.region_id(), manifest.manifest_version);
135        }
136
137        // simply return all ref files, no manifest version filtering for now.
138        Ok(FileRefsManifest {
139            file_refs: ref_files,
140            manifest_version,
141        })
142    }
143
144    /// Adds a new file reference.
145    /// Also records the access layer for the table if not exists.
146    /// The access layer will be used to upload ref file to object storage.
147    pub fn add_file(&self, file_meta: &FileMeta) {
148        let region_id = file_meta.region_id;
149        let mut is_new = false;
150        {
151            let file_ref = FileRef::new(
152                file_meta.region_id,
153                file_meta.file_id,
154                file_meta.index_version(),
155            );
156            self.files_per_region
157                .entry(region_id)
158                .and_modify(|refs| {
159                    refs.files
160                        .entry(file_ref.clone())
161                        .and_modify(|count| *count += 1)
162                        .or_insert_with(|| {
163                            is_new = true;
164                            1
165                        });
166                })
167                .or_insert_with(|| RegionFileRefs {
168                    files: HashMap::from_iter([(file_ref, 1)]),
169                });
170        }
171        if is_new {
172            GC_REF_FILE_CNT.inc();
173        }
174    }
175
176    /// Removes a file reference.
177    /// If the reference count reaches zero, the file reference will be removed from the manager.
178    pub fn remove_file(&self, file_meta: &FileMeta) {
179        let region_id = file_meta.region_id;
180        let file_ref = FileRef::new(region_id, file_meta.file_id, file_meta.index_version());
181
182        let mut remove_table_entry = false;
183        let mut remove_file_ref = false;
184        let mut file_cnt = 0;
185
186        let region_ref = self.files_per_region.entry(region_id).and_modify(|refs| {
187            let entry = refs.files.entry(file_ref.clone()).and_modify(|count| {
188                if *count > 0 {
189                    *count -= 1;
190                }
191                if *count == 0 {
192                    remove_file_ref = true;
193                }
194            });
195            if let std::collections::hash_map::Entry::Occupied(o) = entry
196                && remove_file_ref
197            {
198                o.remove_entry();
199            }
200
201            file_cnt = refs.files.len();
202
203            if refs.files.is_empty() {
204                remove_table_entry = true;
205            }
206        });
207
208        if let Entry::Occupied(o) = region_ref
209            && remove_table_entry
210        {
211            o.remove_entry();
212        }
213        if remove_file_ref {
214            GC_REF_FILE_CNT.dec();
215        }
216    }
217
218    pub fn node_id(&self) -> Option<u64> {
219        self.node_id
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use std::num::NonZeroU64;
226
227    use smallvec::SmallVec;
228    use store_api::storage::{FileId, RegionId};
229
230    use super::*;
231    use crate::sst::file::{ColumnIndexMetadata, FileMeta, FileTimeRange, IndexType, RegionFileId};
232
233    #[tokio::test]
234    async fn test_file_ref_mgr() {
235        common_telemetry::init_default_ut_logging();
236
237        let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
238
239        let file_ref_mgr = FileReferenceManager::new(None);
240
241        let file_meta = FileMeta {
242            region_id: sst_file_id.region_id(),
243            file_id: sst_file_id.file_id(),
244            time_range: FileTimeRange::default(),
245            level: 0,
246            file_size: 4096,
247            max_row_group_uncompressed_size: 4096,
248            available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
249            indexes: vec![ColumnIndexMetadata {
250                column_id: 0,
251                created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
252            }],
253            index_file_size: 4096,
254            index_version: 0,
255            num_rows: 1024,
256            num_row_groups: 1,
257            sequence: NonZeroU64::new(4096),
258            partition_expr: None,
259            num_series: 0,
260        };
261
262        file_ref_mgr.add_file(&file_meta);
263
264        assert_eq!(
265            file_ref_mgr
266                .files_per_region
267                .get(&file_meta.region_id)
268                .unwrap()
269                .files,
270            HashMap::from_iter([(
271                FileRef::new(
272                    file_meta.region_id,
273                    file_meta.file_id,
274                    file_meta.index_version()
275                ),
276                1
277            )])
278        );
279
280        file_ref_mgr.add_file(&file_meta);
281
282        let expected_region_ref_manifest = HashSet::from_iter([FileRef::new(
283            file_meta.region_id,
284            file_meta.file_id,
285            file_meta.index_version(),
286        )]);
287
288        assert_eq!(
289            file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
290            expected_region_ref_manifest
291        );
292
293        assert_eq!(
294            file_ref_mgr
295                .files_per_region
296                .get(&file_meta.region_id)
297                .unwrap()
298                .files,
299            HashMap::from_iter([(
300                FileRef::new(
301                    file_meta.region_id,
302                    file_meta.file_id,
303                    file_meta.index_version()
304                ),
305                2
306            )])
307        );
308
309        assert_eq!(
310            file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
311            expected_region_ref_manifest
312        );
313
314        file_ref_mgr.remove_file(&file_meta);
315
316        assert_eq!(
317            file_ref_mgr
318                .files_per_region
319                .get(&file_meta.region_id)
320                .unwrap()
321                .files,
322            HashMap::from_iter([(
323                FileRef::new(
324                    file_meta.region_id,
325                    file_meta.file_id,
326                    file_meta.index_version()
327                ),
328                1
329            )])
330        );
331
332        assert_eq!(
333            file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
334            expected_region_ref_manifest
335        );
336
337        file_ref_mgr.remove_file(&file_meta);
338
339        assert!(
340            file_ref_mgr
341                .files_per_region
342                .get(&file_meta.region_id)
343                .is_none(),
344            "{:?}",
345            file_ref_mgr.files_per_region
346        );
347
348        assert!(
349            file_ref_mgr.ref_file_set(file_meta.region_id).is_none(),
350            "{:?}",
351            file_ref_mgr.files_per_region
352        );
353    }
354}