mito2/sst/index/
puffin_manager.rs1use std::path::Path;
16use std::sync::Arc;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use common_error::ext::BoxedError;
21use object_store::{FuturesAsyncWriter, ObjectStore};
22use puffin::error::{self as puffin_error, Result as PuffinResult};
23use puffin::puffin_manager::file_accessor::PuffinFileAccessor;
24use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager;
25use puffin::puffin_manager::stager::{BoundedStager, Stager};
26use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
27use snafu::ResultExt;
28
29use crate::access_layer::FilePathProvider;
30use crate::error::{PuffinInitStagerSnafu, PuffinPurgeStagerSnafu, Result};
31use crate::metrics::{
32 INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_READ_BYTES_TOTAL, INDEX_PUFFIN_READ_OP_TOTAL,
33 INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL, StagerMetrics,
34};
35use crate::sst::file::RegionIndexId;
36use crate::sst::index::store::{self, InstrumentedStore};
37
38type InstrumentedRangeReader = store::InstrumentedRangeReader<'static>;
39type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>;
40
41pub(crate) type SstPuffinManager =
42 FsPuffinManager<Arc<BoundedStager<RegionIndexId>>, ObjectStorePuffinFileAccessor>;
43pub(crate) type SstPuffinReader = <SstPuffinManager as PuffinManager>::Reader;
44pub(crate) type SstPuffinWriter = <SstPuffinManager as PuffinManager>::Writer;
45pub(crate) type SstPuffinBlob = <SstPuffinReader as PuffinReader>::Blob;
46pub(crate) type SstPuffinDir = <SstPuffinReader as PuffinReader>::Dir;
47pub(crate) type BlobReader = <SstPuffinBlob as BlobGuard>::Reader;
48
49const STAGING_DIR: &str = "staging";
50
51#[derive(Clone)]
53pub struct PuffinManagerFactory {
54 stager: Arc<BoundedStager<RegionIndexId>>,
56
57 write_buffer_size: Option<usize>,
59}
60
61impl PuffinManagerFactory {
62 pub async fn new(
64 aux_path: impl AsRef<Path>,
65 staging_capacity: u64,
66 write_buffer_size: Option<usize>,
67 staging_ttl: Option<Duration>,
68 ) -> Result<Self> {
69 let staging_dir = aux_path.as_ref().join(STAGING_DIR);
70 let stager = BoundedStager::new(
71 staging_dir,
72 staging_capacity,
73 Some(Arc::new(StagerMetrics::default())),
74 staging_ttl,
75 )
76 .await
77 .context(PuffinInitStagerSnafu)?;
78 Ok(Self {
79 stager: Arc::new(stager),
80 write_buffer_size,
81 })
82 }
83
84 pub(crate) fn build(
85 &self,
86 store: ObjectStore,
87 path_provider: impl FilePathProvider + 'static,
88 ) -> SstPuffinManager {
89 let store = InstrumentedStore::new(store).with_write_buffer_size(self.write_buffer_size);
90 let puffin_file_accessor =
91 ObjectStorePuffinFileAccessor::new(store, Arc::new(path_provider));
92 SstPuffinManager::new(self.stager.clone(), puffin_file_accessor)
93 }
94
95 pub(crate) async fn purge_stager(&self, file_id: RegionIndexId) -> Result<()> {
96 self.stager
97 .purge(&file_id)
98 .await
99 .context(PuffinPurgeStagerSnafu)
100 }
101}
102
103#[cfg(test)]
104impl PuffinManagerFactory {
105 pub(crate) async fn new_for_test_async(
106 prefix: &str,
107 ) -> (common_test_util::temp_dir::TempDir, Self) {
108 let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
109 let factory = Self::new(tempdir.path().to_path_buf(), 1024, None, None)
110 .await
111 .unwrap();
112 (tempdir, factory)
113 }
114
115 pub(crate) fn new_for_test_block(prefix: &str) -> (common_test_util::temp_dir::TempDir, Self) {
116 let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
117
118 let f = Self::new(tempdir.path().to_path_buf(), 1024, None, None);
119 let factory = common_runtime::block_on_global(f).unwrap();
120
121 (tempdir, factory)
122 }
123}
124
125#[derive(Clone)]
127pub(crate) struct ObjectStorePuffinFileAccessor {
128 object_store: InstrumentedStore,
129 path_provider: Arc<dyn FilePathProvider>,
130}
131
132impl ObjectStorePuffinFileAccessor {
133 pub fn new(object_store: InstrumentedStore, path_provider: Arc<dyn FilePathProvider>) -> Self {
134 Self {
135 object_store,
136 path_provider,
137 }
138 }
139
140 pub fn store(&self) -> &InstrumentedStore {
141 &self.object_store
142 }
143}
144
145#[async_trait]
146impl PuffinFileAccessor for ObjectStorePuffinFileAccessor {
147 type Reader = InstrumentedRangeReader;
148 type Writer = InstrumentedAsyncWrite;
149 type FileHandle = RegionIndexId;
150
151 async fn reader(&self, handle: &RegionIndexId) -> PuffinResult<Self::Reader> {
152 let file_path = self
153 .path_provider
154 .build_index_file_path_with_version(*handle);
155 self.object_store
156 .range_reader(
157 &file_path,
158 &INDEX_PUFFIN_READ_BYTES_TOTAL,
159 &INDEX_PUFFIN_READ_OP_TOTAL,
160 )
161 .await
162 .map_err(BoxedError::new)
163 .context(puffin_error::ExternalSnafu)
164 }
165
166 async fn writer(&self, handle: &RegionIndexId) -> PuffinResult<Self::Writer> {
167 let file_path = self
168 .path_provider
169 .build_index_file_path_with_version(*handle);
170 self.object_store
171 .writer(
172 &file_path,
173 &INDEX_PUFFIN_WRITE_BYTES_TOTAL,
174 &INDEX_PUFFIN_WRITE_OP_TOTAL,
175 &INDEX_PUFFIN_FLUSH_OP_TOTAL,
176 )
177 .await
178 .map_err(BoxedError::new)
179 .context(puffin_error::ExternalSnafu)
180 }
181}
182
183#[cfg(test)]
184mod tests {
185
186 use common_base::range_read::RangeReader;
187 use common_test_util::temp_dir::create_temp_dir;
188 use futures::io::Cursor;
189 use object_store::services::Memory;
190 use puffin::blob_metadata::CompressionCodec;
191 use puffin::puffin_manager::{PuffinManager, PuffinReader, PuffinWriter, PutOptions};
192 use store_api::storage::FileId;
193
194 use super::*;
195 use crate::sst::file::{RegionFileId, RegionIndexId};
196
197 struct TestFilePathProvider;
198
199 impl FilePathProvider for TestFilePathProvider {
200 fn build_index_file_path(&self, file_id: RegionFileId) -> String {
201 file_id.file_id().to_string()
202 }
203
204 fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
205 index_id.file_id.file_id().to_string()
206 }
207
208 fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
209 file_id.file_id().to_string()
210 }
211 }
212
213 #[tokio::test]
214 async fn test_puffin_manager_factory() {
215 let (_dir, factory) =
216 PuffinManagerFactory::new_for_test_async("test_puffin_manager_factory_").await;
217
218 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
219 let manager = factory.build(object_store, TestFilePathProvider);
220
221 let file_id = RegionIndexId::new(RegionFileId::new(0.into(), FileId::random()), 0);
222 let blob_key = "blob-key";
223 let dir_key = "dir-key";
224 let raw_data = b"hello world!";
225
226 let mut writer = manager.writer(&file_id).await.unwrap();
227 writer
228 .put_blob(
229 blob_key,
230 Cursor::new(raw_data),
231 PutOptions::default(),
232 Default::default(),
233 )
234 .await
235 .unwrap();
236 let dir_data = create_temp_dir("test_puffin_manager_factory_dir_data_");
237 tokio::fs::write(dir_data.path().join("hello"), raw_data)
238 .await
239 .unwrap();
240 writer
241 .put_dir(
242 dir_key,
243 dir_data.path().into(),
244 PutOptions {
245 compression: Some(CompressionCodec::Zstd),
246 },
247 Default::default(),
248 )
249 .await
250 .unwrap();
251 writer.finish().await.unwrap();
252
253 let reader = manager.reader(&file_id).await.unwrap();
254 let blob_guard = reader.blob(blob_key).await.unwrap();
255 let blob_reader = blob_guard.reader().await.unwrap();
256 let meta = blob_reader.metadata().await.unwrap();
257 let bs = blob_reader.read(0..meta.content_length).await.unwrap();
258 assert_eq!(&*bs, raw_data);
259
260 let (dir_guard, _metrics) = reader.dir(dir_key).await.unwrap();
261 let file = dir_guard.path().join("hello");
262 let data = tokio::fs::read(file).await.unwrap();
263 assert_eq!(data, raw_data);
264 }
265}