puffin/puffin_manager/
stager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod bounded_stager;
16
17use std::path::PathBuf;
18use std::time::Duration;
19
20use async_trait::async_trait;
21pub use bounded_stager::{BoundedStager, FsBlobGuard, FsDirGuard};
22use futures::AsyncWrite;
23use futures::future::BoxFuture;
24
25use crate::error::Result;
26use crate::puffin_manager::{BlobGuard, DirGuard, DirMetrics};
27
28pub type BoxWriter = Box<dyn AsyncWrite + Unpin + Send>;
29
30/// Result containing the number of bytes written (u64).
31pub type WriteResult = BoxFuture<'static, Result<u64>>;
32
33/// `DirWriterProvider` provides a way to write files into a directory.
34#[async_trait]
35pub trait DirWriterProvider {
36    /// Creates a writer for the given relative path.
37    async fn writer(&self, relative_path: &str) -> Result<BoxWriter>;
38}
39
40pub type DirWriterProviderRef = Box<dyn DirWriterProvider + Send>;
41
42/// Function that initializes a blob.
43///
44/// `Stager` will provide a `BoxWriter` that the caller of `get_blob`
45/// can use to write the blob into the staging area.
46pub trait InitBlobFn = FnOnce(BoxWriter) -> WriteResult;
47
48/// Function that initializes a directory.
49///
50/// `Stager` will provide a `DirWriterProvider` that the caller of `get_dir`
51/// can use to write files inside the directory into the staging area.
52pub trait InitDirFn = FnOnce(DirWriterProviderRef) -> WriteResult;
53
54/// `Stager` manages the staging area for the puffin files.
55#[async_trait]
56#[auto_impl::auto_impl(Arc)]
57pub trait Stager: Send + Sync {
58    type Blob: BlobGuard + Sync;
59    type Dir: DirGuard;
60    type FileHandle: ToString + Clone + Send + Sync;
61
62    /// Retrieves a blob, initializing it if necessary using the provided `init_fn`.
63    ///
64    /// The returned `BlobGuard` is used to access the blob reader.
65    /// The caller is responsible for holding the `BlobGuard` until they are done with the blob.
66    async fn get_blob<'a>(
67        &self,
68        handle: &Self::FileHandle,
69        key: &str,
70        init_factory: Box<dyn InitBlobFn + Send + Sync + 'a>,
71    ) -> Result<Self::Blob>;
72
73    /// Retrieves a directory, initializing it if necessary using the provided `init_fn`.
74    ///
75    /// The returned tuple contains the `DirGuard` and `DirMetrics`.
76    /// The `DirGuard` is used to access the directory in the filesystem.
77    /// The caller is responsible for holding the `DirGuard` until they are done with the directory.
78    async fn get_dir<'a>(
79        &self,
80        handle: &Self::FileHandle,
81        key: &str,
82        init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
83    ) -> Result<(Self::Dir, DirMetrics)>;
84
85    /// Stores a directory in the staging area.
86    async fn put_dir(
87        &self,
88        handle: &Self::FileHandle,
89        key: &str,
90        dir_path: PathBuf,
91        dir_size: u64,
92    ) -> Result<()>;
93
94    /// Purges all content for the given puffin file from the staging area.
95    async fn purge(&self, handle: &Self::FileHandle) -> Result<()>;
96}
97
98/// `StagerNotifier` provides a way to notify the caller of the staging events.
99pub trait StagerNotifier: Send + Sync {
100    /// Notifies the caller that a cache hit occurred.
101    /// `size` is the size of the content that was hit in the cache.
102    fn on_cache_hit(&self, size: u64);
103
104    /// Notifies the caller that a cache miss occurred.
105    /// `size` is the size of the content that was missed in the cache.
106    fn on_cache_miss(&self, size: u64);
107
108    /// Notifies the caller that a blob or directory was inserted into the cache.
109    /// `size` is the size of the content that was inserted into the cache.
110    ///
111    /// Note: not only cache misses will trigger this event, but recoveries and recycles as well.
112    fn on_cache_insert(&self, size: u64);
113
114    /// Notifies the caller that a directory was inserted into the cache.
115    /// `duration` is the time it took to load the directory.
116    fn on_load_dir(&self, duration: Duration);
117
118    /// Notifies the caller that a blob was inserted into the cache.
119    /// `duration` is the time it took to load the blob.
120    fn on_load_blob(&self, duration: Duration);
121
122    /// Notifies the caller that a blob or directory was evicted from the cache.
123    /// `size` is the size of the content that was evicted from the cache.
124    fn on_cache_evict(&self, size: u64);
125
126    /// Notifies the caller that a blob or directory was dropped to the recycle bin.
127    /// `size` is the size of the content that was dropped to the recycle bin.
128    fn on_recycle_insert(&self, size: u64);
129
130    /// Notifies the caller that the recycle bin was cleared.
131    /// `size` is the size of the content that was cleared from the recycle bin.
132    fn on_recycle_clear(&self, size: u64);
133}