mito2/sst/index/
puffin_manager.rs1use std::path::Path;
16use std::sync::Arc;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use common_error::ext::BoxedError;
21use object_store::{FuturesAsyncWriter, ObjectStore};
22use puffin::error::{self as puffin_error, Result as PuffinResult};
23use puffin::puffin_manager::file_accessor::PuffinFileAccessor;
24use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager;
25use puffin::puffin_manager::stager::{BoundedStager, Stager};
26use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
27use snafu::ResultExt;
28
29use crate::access_layer::FilePathProvider;
30use crate::error::{PuffinInitStagerSnafu, PuffinPurgeStagerSnafu, Result};
31use crate::metrics::{
32 StagerMetrics, INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_READ_BYTES_TOTAL,
33 INDEX_PUFFIN_READ_OP_TOTAL, INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL,
34};
35use crate::sst::file::FileId;
36use crate::sst::index::store::{self, InstrumentedStore};
37
38type InstrumentedRangeReader = store::InstrumentedRangeReader<'static>;
39type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>;
40
41pub(crate) type SstPuffinManager =
42 FsPuffinManager<Arc<BoundedStager<FileId>>, ObjectStorePuffinFileAccessor>;
43pub(crate) type SstPuffinReader = <SstPuffinManager as PuffinManager>::Reader;
44pub(crate) type SstPuffinWriter = <SstPuffinManager as PuffinManager>::Writer;
45pub(crate) type SstPuffinBlob = <SstPuffinReader as PuffinReader>::Blob;
46pub(crate) type SstPuffinDir = <SstPuffinReader as PuffinReader>::Dir;
47pub(crate) type BlobReader = <SstPuffinBlob as BlobGuard>::Reader;
48
49const STAGING_DIR: &str = "staging";
50
51#[derive(Clone)]
53pub struct PuffinManagerFactory {
54 stager: Arc<BoundedStager<FileId>>,
56
57 write_buffer_size: Option<usize>,
59}
60
61impl PuffinManagerFactory {
62 pub async fn new(
64 aux_path: impl AsRef<Path>,
65 staging_capacity: u64,
66 write_buffer_size: Option<usize>,
67 staging_ttl: Option<Duration>,
68 ) -> Result<Self> {
69 let staging_dir = aux_path.as_ref().join(STAGING_DIR);
70 let stager = BoundedStager::new(
71 staging_dir,
72 staging_capacity,
73 Some(Arc::new(StagerMetrics::default())),
74 staging_ttl,
75 )
76 .await
77 .context(PuffinInitStagerSnafu)?;
78 Ok(Self {
79 stager: Arc::new(stager),
80 write_buffer_size,
81 })
82 }
83
84 pub(crate) fn build(
85 &self,
86 store: ObjectStore,
87 path_provider: impl FilePathProvider + 'static,
88 ) -> SstPuffinManager {
89 let store = InstrumentedStore::new(store).with_write_buffer_size(self.write_buffer_size);
90 let puffin_file_accessor =
91 ObjectStorePuffinFileAccessor::new(store, Arc::new(path_provider));
92 SstPuffinManager::new(self.stager.clone(), puffin_file_accessor)
93 }
94
95 pub(crate) async fn purge_stager(&self, file_id: FileId) -> Result<()> {
96 self.stager
97 .purge(&file_id)
98 .await
99 .context(PuffinPurgeStagerSnafu)
100 }
101}
102
103#[cfg(test)]
104impl PuffinManagerFactory {
105 pub(crate) async fn new_for_test_async(
106 prefix: &str,
107 ) -> (common_test_util::temp_dir::TempDir, Self) {
108 let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
109 let factory = Self::new(tempdir.path().to_path_buf(), 1024, None, None)
110 .await
111 .unwrap();
112 (tempdir, factory)
113 }
114
115 pub(crate) fn new_for_test_block(prefix: &str) -> (common_test_util::temp_dir::TempDir, Self) {
116 let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
117
118 let f = Self::new(tempdir.path().to_path_buf(), 1024, None, None);
119 let factory = common_runtime::block_on_global(f).unwrap();
120
121 (tempdir, factory)
122 }
123}
124
125#[derive(Clone)]
127pub(crate) struct ObjectStorePuffinFileAccessor {
128 object_store: InstrumentedStore,
129 path_provider: Arc<dyn FilePathProvider>,
130}
131
132impl ObjectStorePuffinFileAccessor {
133 pub fn new(object_store: InstrumentedStore, path_provider: Arc<dyn FilePathProvider>) -> Self {
134 Self {
135 object_store,
136 path_provider,
137 }
138 }
139}
140
141#[async_trait]
142impl PuffinFileAccessor for ObjectStorePuffinFileAccessor {
143 type Reader = InstrumentedRangeReader;
144 type Writer = InstrumentedAsyncWrite;
145 type FileHandle = FileId;
146
147 async fn reader(&self, handle: &FileId) -> PuffinResult<Self::Reader> {
148 let file_path = self.path_provider.build_index_file_path(*handle);
149 self.object_store
150 .range_reader(
151 &file_path,
152 &INDEX_PUFFIN_READ_BYTES_TOTAL,
153 &INDEX_PUFFIN_READ_OP_TOTAL,
154 )
155 .await
156 .map_err(BoxedError::new)
157 .context(puffin_error::ExternalSnafu)
158 }
159
160 async fn writer(&self, handle: &FileId) -> PuffinResult<Self::Writer> {
161 let file_path = self.path_provider.build_index_file_path(*handle);
162 self.object_store
163 .writer(
164 &file_path,
165 &INDEX_PUFFIN_WRITE_BYTES_TOTAL,
166 &INDEX_PUFFIN_WRITE_OP_TOTAL,
167 &INDEX_PUFFIN_FLUSH_OP_TOTAL,
168 )
169 .await
170 .map_err(BoxedError::new)
171 .context(puffin_error::ExternalSnafu)
172 }
173}
174
175#[cfg(test)]
176mod tests {
177
178 use common_base::range_read::RangeReader;
179 use common_test_util::temp_dir::create_temp_dir;
180 use futures::io::Cursor;
181 use object_store::services::Memory;
182 use puffin::blob_metadata::CompressionCodec;
183 use puffin::puffin_manager::{PuffinManager, PuffinReader, PuffinWriter, PutOptions};
184
185 use super::*;
186
187 struct TestFilePathProvider;
188
189 impl FilePathProvider for TestFilePathProvider {
190 fn build_index_file_path(&self, file_id: FileId) -> String {
191 file_id.to_string()
192 }
193
194 fn build_sst_file_path(&self, file_id: FileId) -> String {
195 file_id.to_string()
196 }
197 }
198
199 #[tokio::test]
200 async fn test_puffin_manager_factory() {
201 let (_dir, factory) =
202 PuffinManagerFactory::new_for_test_async("test_puffin_manager_factory_").await;
203
204 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
205 let manager = factory.build(object_store, TestFilePathProvider);
206
207 let file_id = FileId::random();
208 let blob_key = "blob-key";
209 let dir_key = "dir-key";
210 let raw_data = b"hello world!";
211
212 let mut writer = manager.writer(&file_id).await.unwrap();
213 writer
214 .put_blob(
215 blob_key,
216 Cursor::new(raw_data),
217 PutOptions::default(),
218 Default::default(),
219 )
220 .await
221 .unwrap();
222 let dir_data = create_temp_dir("test_puffin_manager_factory_dir_data_");
223 tokio::fs::write(dir_data.path().join("hello"), raw_data)
224 .await
225 .unwrap();
226 writer
227 .put_dir(
228 dir_key,
229 dir_data.path().into(),
230 PutOptions {
231 compression: Some(CompressionCodec::Zstd),
232 },
233 Default::default(),
234 )
235 .await
236 .unwrap();
237 writer.finish().await.unwrap();
238
239 let reader = manager.reader(&file_id).await.unwrap();
240 let blob_guard = reader.blob(blob_key).await.unwrap();
241 let blob_reader = blob_guard.reader().await.unwrap();
242 let meta = blob_reader.metadata().await.unwrap();
243 let bs = blob_reader.read(0..meta.content_length).await.unwrap();
244 assert_eq!(&*bs, raw_data);
245
246 let dir_guard = reader.dir(dir_key).await.unwrap();
247 let file = dir_guard.path().join("hello");
248 let data = tokio::fs::read(file).await.unwrap();
249 assert_eq!(data, raw_data);
250 }
251}