mito2/manifest/storage/
size_tracker.rs1use std::collections::HashMap;
16use std::fmt::Debug;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::{Arc, RwLock};
19
20use store_api::ManifestVersion;
21
22#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
24pub(crate) enum FileKey {
25 Delta(ManifestVersion),
27 Checkpoint(ManifestVersion),
29}
30
31pub(crate) trait Tracker: Send + Sync + Debug {
32 fn record(&self, version: ManifestVersion, size: u64);
33}
34
35#[derive(Debug, Clone)]
36pub struct CheckpointTracker {
37 size_tracker: SizeTracker,
38}
39
40impl Tracker for CheckpointTracker {
41 fn record(&self, version: ManifestVersion, size: u64) {
42 self.size_tracker.record(FileKey::Checkpoint(version), size);
43 }
44}
45
46#[derive(Debug, Clone)]
47pub struct DeltaTracker {
48 size_tracker: SizeTracker,
49}
50
51impl Tracker for DeltaTracker {
52 fn record(&self, version: ManifestVersion, size: u64) {
53 self.size_tracker.record(FileKey::Delta(version), size);
54 }
55}
56
57#[derive(Debug, Clone)]
58pub struct NoopTracker;
59
60impl Tracker for NoopTracker {
61 fn record(&self, _version: ManifestVersion, _size: u64) {
62 }
64}
65
66#[derive(Debug, Clone, Default)]
67pub(crate) struct SizeTracker {
68 file_sizes: Arc<RwLock<HashMap<FileKey, u64>>>,
69 total_size: Arc<AtomicU64>,
70}
71
72impl SizeTracker {
73 pub fn new(total_size: Arc<AtomicU64>) -> Self {
75 Self {
76 file_sizes: Arc::new(RwLock::new(HashMap::new())),
77 total_size,
78 }
79 }
80
81 pub(crate) fn manifest_tracker(&self) -> DeltaTracker {
83 DeltaTracker {
84 size_tracker: self.clone(),
85 }
86 }
87
88 pub(crate) fn checkpoint_tracker(&self) -> CheckpointTracker {
90 CheckpointTracker {
91 size_tracker: self.clone(),
92 }
93 }
94
95 pub(crate) fn record_delta(&self, version: ManifestVersion, size: u64) {
97 self.record(FileKey::Delta(version), size);
98 }
99
100 pub(crate) fn record_checkpoint(&self, version: ManifestVersion, size: u64) {
102 self.record(FileKey::Checkpoint(version), size);
103 }
104
105 pub(crate) fn remove(&self, key: &FileKey) {
107 if let Some(size) = self.file_sizes.write().unwrap().remove(key) {
108 self.total_size.fetch_sub(size, Ordering::Relaxed);
109 }
110 }
111
112 pub(crate) fn total(&self) -> u64 {
114 self.total_size.load(Ordering::Relaxed)
115 }
116
117 pub(crate) fn reset(&self) {
119 self.file_sizes.write().unwrap().clear();
120 self.total_size.store(0, Ordering::Relaxed);
121 }
122
123 fn record(&self, key: FileKey, size: u64) {
124 if let Some(old_size) = self.file_sizes.write().unwrap().insert(key, size) {
126 self.total_size.fetch_sub(old_size, Ordering::Relaxed);
127 }
128 self.total_size.fetch_add(size, Ordering::Relaxed);
129 }
130}