puffin/puffin_manager/stager.rs
1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod bounded_stager;
16
17use std::path::PathBuf;
18use std::time::Duration;
19
20use async_trait::async_trait;
21pub use bounded_stager::{BoundedStager, FsBlobGuard, FsDirGuard};
22use futures::AsyncWrite;
23use futures::future::BoxFuture;
24
25use crate::error::Result;
26use crate::puffin_manager::{BlobGuard, DirGuard, DirMetrics};
27
28pub type BoxWriter = Box<dyn AsyncWrite + Unpin + Send>;
29
30/// Result containing the number of bytes written (u64).
31pub type WriteResult = BoxFuture<'static, Result<u64>>;
32
33/// `DirWriterProvider` provides a way to write files into a directory.
34#[async_trait]
35pub trait DirWriterProvider {
36 /// Creates a writer for the given relative path.
37 async fn writer(&self, relative_path: &str) -> Result<BoxWriter>;
38}
39
40pub type DirWriterProviderRef = Box<dyn DirWriterProvider + Send>;
41
42/// Function that initializes a blob.
43///
44/// `Stager` will provide a `BoxWriter` that the caller of `get_blob`
45/// can use to write the blob into the staging area.
46pub trait InitBlobFn = FnOnce(BoxWriter) -> WriteResult;
47
48/// Function that initializes a directory.
49///
50/// `Stager` will provide a `DirWriterProvider` that the caller of `get_dir`
51/// can use to write files inside the directory into the staging area.
52pub trait InitDirFn = FnOnce(DirWriterProviderRef) -> WriteResult;
53
54/// `Stager` manages the staging area for the puffin files.
55#[async_trait]
56#[auto_impl::auto_impl(Arc)]
57pub trait Stager: Send + Sync {
58 type Blob: BlobGuard + Sync;
59 type Dir: DirGuard;
60 type FileHandle: ToString + Clone + Send + Sync;
61
62 /// Retrieves a blob, initializing it if necessary using the provided `init_fn`.
63 ///
64 /// The returned `BlobGuard` is used to access the blob reader.
65 /// The caller is responsible for holding the `BlobGuard` until they are done with the blob.
66 async fn get_blob<'a>(
67 &self,
68 handle: &Self::FileHandle,
69 key: &str,
70 init_factory: Box<dyn InitBlobFn + Send + Sync + 'a>,
71 ) -> Result<Self::Blob>;
72
73 /// Retrieves a directory, initializing it if necessary using the provided `init_fn`.
74 ///
75 /// The returned tuple contains the `DirGuard` and `DirMetrics`.
76 /// The `DirGuard` is used to access the directory in the filesystem.
77 /// The caller is responsible for holding the `DirGuard` until they are done with the directory.
78 async fn get_dir<'a>(
79 &self,
80 handle: &Self::FileHandle,
81 key: &str,
82 init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
83 ) -> Result<(Self::Dir, DirMetrics)>;
84
85 /// Stores a directory in the staging area.
86 async fn put_dir(
87 &self,
88 handle: &Self::FileHandle,
89 key: &str,
90 dir_path: PathBuf,
91 dir_size: u64,
92 ) -> Result<()>;
93
94 /// Purges all content for the given puffin file from the staging area.
95 async fn purge(&self, handle: &Self::FileHandle) -> Result<()>;
96}
97
98/// `StagerNotifier` provides a way to notify the caller of the staging events.
99pub trait StagerNotifier: Send + Sync {
100 /// Notifies the caller that a cache hit occurred.
101 /// `size` is the size of the content that was hit in the cache.
102 fn on_cache_hit(&self, size: u64);
103
104 /// Notifies the caller that a cache miss occurred.
105 /// `size` is the size of the content that was missed in the cache.
106 fn on_cache_miss(&self, size: u64);
107
108 /// Notifies the caller that a blob or directory was inserted into the cache.
109 /// `size` is the size of the content that was inserted into the cache.
110 ///
111 /// Note: not only cache misses will trigger this event, but recoveries and recycles as well.
112 fn on_cache_insert(&self, size: u64);
113
114 /// Notifies the caller that a directory was inserted into the cache.
115 /// `duration` is the time it took to load the directory.
116 fn on_load_dir(&self, duration: Duration);
117
118 /// Notifies the caller that a blob was inserted into the cache.
119 /// `duration` is the time it took to load the blob.
120 fn on_load_blob(&self, duration: Duration);
121
122 /// Notifies the caller that a blob or directory was evicted from the cache.
123 /// `size` is the size of the content that was evicted from the cache.
124 fn on_cache_evict(&self, size: u64);
125
126 /// Notifies the caller that a blob or directory was dropped to the recycle bin.
127 /// `size` is the size of the content that was dropped to the recycle bin.
128 fn on_recycle_insert(&self, size: u64);
129
130 /// Notifies the caller that the recycle bin was cleared.
131 /// `size` is the size of the content that was cleared from the recycle bin.
132 fn on_recycle_clear(&self, size: u64);
133}