puffin/puffin_manager/
stager.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod bounded_stager;

use std::path::PathBuf;
use std::time::Duration;

use async_trait::async_trait;
pub use bounded_stager::{BoundedStager, FsBlobGuard, FsDirGuard};
use futures::future::BoxFuture;
use futures::AsyncWrite;

use crate::error::Result;
use crate::puffin_manager::{BlobGuard, DirGuard};

pub type BoxWriter = Box<dyn AsyncWrite + Unpin + Send>;

/// Result containing the number of bytes written (u64).
pub type WriteResult = BoxFuture<'static, Result<u64>>;

/// `DirWriterProvider` provides a way to write files into a directory.
#[async_trait]
pub trait DirWriterProvider {
    /// Creates a writer for the given relative path.
    async fn writer(&self, relative_path: &str) -> Result<BoxWriter>;
}

pub type DirWriterProviderRef = Box<dyn DirWriterProvider + Send>;

/// Function that initializes a blob.
///
/// `Stager` will provide a `BoxWriter` that the caller of `get_blob`
/// can use to write the blob into the staging area.
pub trait InitBlobFn = FnOnce(BoxWriter) -> WriteResult;

/// Function that initializes a directory.
///
/// `Stager` will provide a `DirWriterProvider` that the caller of `get_dir`
/// can use to write files inside the directory into the staging area.
pub trait InitDirFn = FnOnce(DirWriterProviderRef) -> WriteResult;

/// `Stager` manages the staging area for the puffin files.
#[async_trait]
#[auto_impl::auto_impl(Arc)]
pub trait Stager: Send + Sync {
    type Blob: BlobGuard + Sync;
    type Dir: DirGuard;
    type FileHandle: ToString + Clone + Send + Sync;

    /// Retrieves a blob, initializing it if necessary using the provided `init_fn`.
    ///
    /// The returned `BlobGuard` is used to access the blob reader.
    /// The caller is responsible for holding the `BlobGuard` until they are done with the blob.
    async fn get_blob<'a>(
        &self,
        handle: &Self::FileHandle,
        key: &str,
        init_factory: Box<dyn InitBlobFn + Send + Sync + 'a>,
    ) -> Result<Self::Blob>;

    /// Retrieves a directory, initializing it if necessary using the provided `init_fn`.
    ///
    /// The returned `DirGuard` is used to access the directory in the filesystem.
    /// The caller is responsible for holding the `DirGuard` until they are done with the directory.
    async fn get_dir<'a>(
        &self,
        handle: &Self::FileHandle,
        key: &str,
        init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
    ) -> Result<Self::Dir>;

    /// Stores a directory in the staging area.
    async fn put_dir(
        &self,
        handle: &Self::FileHandle,
        key: &str,
        dir_path: PathBuf,
        dir_size: u64,
    ) -> Result<()>;

    /// Purges all content for the given puffin file from the staging area.
    async fn purge(&self, handle: &Self::FileHandle) -> Result<()>;
}

/// `StagerNotifier` provides a way to notify the caller of the staging events.
pub trait StagerNotifier: Send + Sync {
    /// Notifies the caller that a cache hit occurred.
    /// `size` is the size of the content that was hit in the cache.
    fn on_cache_hit(&self, size: u64);

    /// Notifies the caller that a cache miss occurred.
    /// `size` is the size of the content that was missed in the cache.
    fn on_cache_miss(&self, size: u64);

    /// Notifies the caller that a blob or directory was inserted into the cache.
    /// `size` is the size of the content that was inserted into the cache.
    ///
    /// Note: not only cache misses will trigger this event, but recoveries and recycles as well.
    fn on_cache_insert(&self, size: u64);

    /// Notifies the caller that a directory was inserted into the cache.
    /// `duration` is the time it took to load the directory.
    fn on_load_dir(&self, duration: Duration);

    /// Notifies the caller that a blob was inserted into the cache.
    /// `duration` is the time it took to load the blob.
    fn on_load_blob(&self, duration: Duration);

    /// Notifies the caller that a blob or directory was evicted from the cache.
    /// `size` is the size of the content that was evicted from the cache.
    fn on_cache_evict(&self, size: u64);

    /// Notifies the caller that a blob or directory was dropped to the recycle bin.
    /// `size` is the size of the content that was dropped to the recycle bin.
    fn on_recycle_insert(&self, size: u64);

    /// Notifies the caller that the recycle bin was cleared.
    /// `size` is the size of the content that was cleared from the recycle bin.
    fn on_recycle_clear(&self, size: u64);
}