puffin/puffin_manager/
stager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod bounded_stager;
16
17use std::path::PathBuf;
18use std::time::Duration;
19
20use async_trait::async_trait;
21pub use bounded_stager::{BoundedStager, FsBlobGuard, FsDirGuard};
22use futures::future::BoxFuture;
23use futures::AsyncWrite;
24
25use crate::error::Result;
26use crate::puffin_manager::{BlobGuard, DirGuard};
27
28pub type BoxWriter = Box<dyn AsyncWrite + Unpin + Send>;
29
30/// Result containing the number of bytes written (u64).
31pub type WriteResult = BoxFuture<'static, Result<u64>>;
32
33/// `DirWriterProvider` provides a way to write files into a directory.
34#[async_trait]
35pub trait DirWriterProvider {
36    /// Creates a writer for the given relative path.
37    async fn writer(&self, relative_path: &str) -> Result<BoxWriter>;
38}
39
40pub type DirWriterProviderRef = Box<dyn DirWriterProvider + Send>;
41
42/// Function that initializes a blob.
43///
44/// `Stager` will provide a `BoxWriter` that the caller of `get_blob`
45/// can use to write the blob into the staging area.
46pub trait InitBlobFn = FnOnce(BoxWriter) -> WriteResult;
47
48/// Function that initializes a directory.
49///
50/// `Stager` will provide a `DirWriterProvider` that the caller of `get_dir`
51/// can use to write files inside the directory into the staging area.
52pub trait InitDirFn = FnOnce(DirWriterProviderRef) -> WriteResult;
53
54/// `Stager` manages the staging area for the puffin files.
55#[async_trait]
56#[auto_impl::auto_impl(Arc)]
57pub trait Stager: Send + Sync {
58    type Blob: BlobGuard + Sync;
59    type Dir: DirGuard;
60    type FileHandle: ToString + Clone + Send + Sync;
61
62    /// Retrieves a blob, initializing it if necessary using the provided `init_fn`.
63    ///
64    /// The returned `BlobGuard` is used to access the blob reader.
65    /// The caller is responsible for holding the `BlobGuard` until they are done with the blob.
66    async fn get_blob<'a>(
67        &self,
68        handle: &Self::FileHandle,
69        key: &str,
70        init_factory: Box<dyn InitBlobFn + Send + Sync + 'a>,
71    ) -> Result<Self::Blob>;
72
73    /// Retrieves a directory, initializing it if necessary using the provided `init_fn`.
74    ///
75    /// The returned `DirGuard` is used to access the directory in the filesystem.
76    /// The caller is responsible for holding the `DirGuard` until they are done with the directory.
77    async fn get_dir<'a>(
78        &self,
79        handle: &Self::FileHandle,
80        key: &str,
81        init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
82    ) -> Result<Self::Dir>;
83
84    /// Stores a directory in the staging area.
85    async fn put_dir(
86        &self,
87        handle: &Self::FileHandle,
88        key: &str,
89        dir_path: PathBuf,
90        dir_size: u64,
91    ) -> Result<()>;
92
93    /// Purges all content for the given puffin file from the staging area.
94    async fn purge(&self, handle: &Self::FileHandle) -> Result<()>;
95}
96
97/// `StagerNotifier` provides a way to notify the caller of the staging events.
98pub trait StagerNotifier: Send + Sync {
99    /// Notifies the caller that a cache hit occurred.
100    /// `size` is the size of the content that was hit in the cache.
101    fn on_cache_hit(&self, size: u64);
102
103    /// Notifies the caller that a cache miss occurred.
104    /// `size` is the size of the content that was missed in the cache.
105    fn on_cache_miss(&self, size: u64);
106
107    /// Notifies the caller that a blob or directory was inserted into the cache.
108    /// `size` is the size of the content that was inserted into the cache.
109    ///
110    /// Note: not only cache misses will trigger this event, but recoveries and recycles as well.
111    fn on_cache_insert(&self, size: u64);
112
113    /// Notifies the caller that a directory was inserted into the cache.
114    /// `duration` is the time it took to load the directory.
115    fn on_load_dir(&self, duration: Duration);
116
117    /// Notifies the caller that a blob was inserted into the cache.
118    /// `duration` is the time it took to load the blob.
119    fn on_load_blob(&self, duration: Duration);
120
121    /// Notifies the caller that a blob or directory was evicted from the cache.
122    /// `size` is the size of the content that was evicted from the cache.
123    fn on_cache_evict(&self, size: u64);
124
125    /// Notifies the caller that a blob or directory was dropped to the recycle bin.
126    /// `size` is the size of the content that was dropped to the recycle bin.
127    fn on_recycle_insert(&self, size: u64);
128
129    /// Notifies the caller that the recycle bin was cleared.
130    /// `size` is the size of the content that was cleared from the recycle bin.
131    fn on_recycle_clear(&self, size: u64);
132}