mito2/manifest/storage/
size_tracker.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Debug;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::{Arc, RwLock};
19
20use store_api::ManifestVersion;
21
22/// Key to identify a manifest file.
23#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
24pub(crate) enum FileKey {
25    /// A delta file (`.json`).
26    Delta(ManifestVersion),
27    /// A checkpoint file (`.checkpoint`).
28    Checkpoint(ManifestVersion),
29}
30
31pub(crate) trait Tracker: Send + Sync + Debug {
32    fn record(&self, version: ManifestVersion, size: u64);
33}
34
35#[derive(Debug, Clone)]
36pub struct CheckpointTracker {
37    size_tracker: SizeTracker,
38}
39
40impl Tracker for CheckpointTracker {
41    fn record(&self, version: ManifestVersion, size: u64) {
42        self.size_tracker.record(FileKey::Checkpoint(version), size);
43    }
44}
45
46#[derive(Debug, Clone)]
47pub struct DeltaTracker {
48    size_tracker: SizeTracker,
49}
50
51impl Tracker for DeltaTracker {
52    fn record(&self, version: ManifestVersion, size: u64) {
53        self.size_tracker.record(FileKey::Delta(version), size);
54    }
55}
56
57#[derive(Debug, Clone)]
58pub struct NoopTracker;
59
60impl Tracker for NoopTracker {
61    fn record(&self, _version: ManifestVersion, _size: u64) {
62        // noop
63    }
64}
65
66#[derive(Debug, Clone, Default)]
67pub(crate) struct SizeTracker {
68    file_sizes: Arc<RwLock<HashMap<FileKey, u64>>>,
69    total_size: Arc<AtomicU64>,
70}
71
72impl SizeTracker {
73    /// Returns a new [SizeTracker].
74    pub fn new(total_size: Arc<AtomicU64>) -> Self {
75        Self {
76            file_sizes: Arc::new(RwLock::new(HashMap::new())),
77            total_size,
78        }
79    }
80
81    /// Returns the manifest tracker.
82    pub(crate) fn manifest_tracker(&self) -> DeltaTracker {
83        DeltaTracker {
84            size_tracker: self.clone(),
85        }
86    }
87
88    /// Returns the checkpoint tracker.
89    pub(crate) fn checkpoint_tracker(&self) -> CheckpointTracker {
90        CheckpointTracker {
91            size_tracker: self.clone(),
92        }
93    }
94
95    /// Records a delta file size.
96    pub(crate) fn record_delta(&self, version: ManifestVersion, size: u64) {
97        self.record(FileKey::Delta(version), size);
98    }
99
100    /// Records a checkpoint file size.
101    pub(crate) fn record_checkpoint(&self, version: ManifestVersion, size: u64) {
102        self.record(FileKey::Checkpoint(version), size);
103    }
104
105    /// Removes a file from tracking.
106    pub(crate) fn remove(&self, key: &FileKey) {
107        if let Some(size) = self.file_sizes.write().unwrap().remove(key) {
108            self.total_size.fetch_sub(size, Ordering::Relaxed);
109        }
110    }
111
112    /// Returns the total tracked size.
113    pub(crate) fn total(&self) -> u64 {
114        self.total_size.load(Ordering::Relaxed)
115    }
116
117    /// Resets all tracking.
118    pub(crate) fn reset(&self) {
119        self.file_sizes.write().unwrap().clear();
120        self.total_size.store(0, Ordering::Relaxed);
121    }
122
123    fn record(&self, key: FileKey, size: u64) {
124        // Remove the old size if present
125        if let Some(old_size) = self.file_sizes.write().unwrap().insert(key, size) {
126            self.total_size.fetch_sub(old_size, Ordering::Relaxed);
127        }
128        self.total_size.fetch_add(size, Ordering::Relaxed);
129    }
130}