puffin/puffin_manager/stager.rs
1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod bounded_stager;
16
17use std::path::PathBuf;
18use std::time::Duration;
19
20use async_trait::async_trait;
21pub use bounded_stager::{BoundedStager, FsBlobGuard, FsDirGuard};
22use futures::future::BoxFuture;
23use futures::AsyncWrite;
24
25use crate::error::Result;
26use crate::puffin_manager::{BlobGuard, DirGuard};
27
28pub type BoxWriter = Box<dyn AsyncWrite + Unpin + Send>;
29
30/// Result containing the number of bytes written (u64).
31pub type WriteResult = BoxFuture<'static, Result<u64>>;
32
33/// `DirWriterProvider` provides a way to write files into a directory.
34#[async_trait]
35pub trait DirWriterProvider {
36 /// Creates a writer for the given relative path.
37 async fn writer(&self, relative_path: &str) -> Result<BoxWriter>;
38}
39
40pub type DirWriterProviderRef = Box<dyn DirWriterProvider + Send>;
41
42/// Function that initializes a blob.
43///
44/// `Stager` will provide a `BoxWriter` that the caller of `get_blob`
45/// can use to write the blob into the staging area.
46pub trait InitBlobFn = FnOnce(BoxWriter) -> WriteResult;
47
48/// Function that initializes a directory.
49///
50/// `Stager` will provide a `DirWriterProvider` that the caller of `get_dir`
51/// can use to write files inside the directory into the staging area.
52pub trait InitDirFn = FnOnce(DirWriterProviderRef) -> WriteResult;
53
54/// `Stager` manages the staging area for the puffin files.
55#[async_trait]
56#[auto_impl::auto_impl(Arc)]
57pub trait Stager: Send + Sync {
58 type Blob: BlobGuard + Sync;
59 type Dir: DirGuard;
60 type FileHandle: ToString + Clone + Send + Sync;
61
62 /// Retrieves a blob, initializing it if necessary using the provided `init_fn`.
63 ///
64 /// The returned `BlobGuard` is used to access the blob reader.
65 /// The caller is responsible for holding the `BlobGuard` until they are done with the blob.
66 async fn get_blob<'a>(
67 &self,
68 handle: &Self::FileHandle,
69 key: &str,
70 init_factory: Box<dyn InitBlobFn + Send + Sync + 'a>,
71 ) -> Result<Self::Blob>;
72
73 /// Retrieves a directory, initializing it if necessary using the provided `init_fn`.
74 ///
75 /// The returned `DirGuard` is used to access the directory in the filesystem.
76 /// The caller is responsible for holding the `DirGuard` until they are done with the directory.
77 async fn get_dir<'a>(
78 &self,
79 handle: &Self::FileHandle,
80 key: &str,
81 init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
82 ) -> Result<Self::Dir>;
83
84 /// Stores a directory in the staging area.
85 async fn put_dir(
86 &self,
87 handle: &Self::FileHandle,
88 key: &str,
89 dir_path: PathBuf,
90 dir_size: u64,
91 ) -> Result<()>;
92
93 /// Purges all content for the given puffin file from the staging area.
94 async fn purge(&self, handle: &Self::FileHandle) -> Result<()>;
95}
96
97/// `StagerNotifier` provides a way to notify the caller of the staging events.
98pub trait StagerNotifier: Send + Sync {
99 /// Notifies the caller that a cache hit occurred.
100 /// `size` is the size of the content that was hit in the cache.
101 fn on_cache_hit(&self, size: u64);
102
103 /// Notifies the caller that a cache miss occurred.
104 /// `size` is the size of the content that was missed in the cache.
105 fn on_cache_miss(&self, size: u64);
106
107 /// Notifies the caller that a blob or directory was inserted into the cache.
108 /// `size` is the size of the content that was inserted into the cache.
109 ///
110 /// Note: not only cache misses will trigger this event, but recoveries and recycles as well.
111 fn on_cache_insert(&self, size: u64);
112
113 /// Notifies the caller that a directory was inserted into the cache.
114 /// `duration` is the time it took to load the directory.
115 fn on_load_dir(&self, duration: Duration);
116
117 /// Notifies the caller that a blob was inserted into the cache.
118 /// `duration` is the time it took to load the blob.
119 fn on_load_blob(&self, duration: Duration);
120
121 /// Notifies the caller that a blob or directory was evicted from the cache.
122 /// `size` is the size of the content that was evicted from the cache.
123 fn on_cache_evict(&self, size: u64);
124
125 /// Notifies the caller that a blob or directory was dropped to the recycle bin.
126 /// `size` is the size of the content that was dropped to the recycle bin.
127 fn on_recycle_insert(&self, size: u64);
128
129 /// Notifies the caller that the recycle bin was cleared.
130 /// `size` is the size of the content that was cleared from the recycle bin.
131 fn on_recycle_clear(&self, size: u64);
132}