mito2/sst/index/
puffin_manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::Path;
16use std::sync::Arc;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use common_error::ext::BoxedError;
21use object_store::{FuturesAsyncWriter, ObjectStore};
22use puffin::error::{self as puffin_error, Result as PuffinResult};
23use puffin::puffin_manager::file_accessor::PuffinFileAccessor;
24use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager;
25use puffin::puffin_manager::stager::{BoundedStager, Stager};
26use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
27use snafu::ResultExt;
28
29use crate::access_layer::FilePathProvider;
30use crate::error::{PuffinInitStagerSnafu, PuffinPurgeStagerSnafu, Result};
31use crate::metrics::{
32    StagerMetrics, INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_READ_BYTES_TOTAL,
33    INDEX_PUFFIN_READ_OP_TOTAL, INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL,
34};
35use crate::sst::file::FileId;
36use crate::sst::index::store::{self, InstrumentedStore};
37
38type InstrumentedRangeReader = store::InstrumentedRangeReader<'static>;
39type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>;
40
41pub(crate) type SstPuffinManager =
42    FsPuffinManager<Arc<BoundedStager<FileId>>, ObjectStorePuffinFileAccessor>;
43pub(crate) type SstPuffinReader = <SstPuffinManager as PuffinManager>::Reader;
44pub(crate) type SstPuffinWriter = <SstPuffinManager as PuffinManager>::Writer;
45pub(crate) type SstPuffinBlob = <SstPuffinReader as PuffinReader>::Blob;
46pub(crate) type SstPuffinDir = <SstPuffinReader as PuffinReader>::Dir;
47pub(crate) type BlobReader = <SstPuffinBlob as BlobGuard>::Reader;
48
49const STAGING_DIR: &str = "staging";
50
51/// A factory for creating `SstPuffinManager` instances.
52#[derive(Clone)]
53pub struct PuffinManagerFactory {
54    /// The stager used by the puffin manager.
55    stager: Arc<BoundedStager<FileId>>,
56
57    /// The size of the write buffer used to create object store.
58    write_buffer_size: Option<usize>,
59}
60
61impl PuffinManagerFactory {
62    /// Creates a new `PuffinManagerFactory` instance.
63    pub async fn new(
64        aux_path: impl AsRef<Path>,
65        staging_capacity: u64,
66        write_buffer_size: Option<usize>,
67        staging_ttl: Option<Duration>,
68    ) -> Result<Self> {
69        let staging_dir = aux_path.as_ref().join(STAGING_DIR);
70        let stager = BoundedStager::new(
71            staging_dir,
72            staging_capacity,
73            Some(Arc::new(StagerMetrics::default())),
74            staging_ttl,
75        )
76        .await
77        .context(PuffinInitStagerSnafu)?;
78        Ok(Self {
79            stager: Arc::new(stager),
80            write_buffer_size,
81        })
82    }
83
84    pub(crate) fn build(
85        &self,
86        store: ObjectStore,
87        path_provider: impl FilePathProvider + 'static,
88    ) -> SstPuffinManager {
89        let store = InstrumentedStore::new(store).with_write_buffer_size(self.write_buffer_size);
90        let puffin_file_accessor =
91            ObjectStorePuffinFileAccessor::new(store, Arc::new(path_provider));
92        SstPuffinManager::new(self.stager.clone(), puffin_file_accessor)
93    }
94
95    pub(crate) async fn purge_stager(&self, file_id: FileId) -> Result<()> {
96        self.stager
97            .purge(&file_id)
98            .await
99            .context(PuffinPurgeStagerSnafu)
100    }
101}
102
103#[cfg(test)]
104impl PuffinManagerFactory {
105    pub(crate) async fn new_for_test_async(
106        prefix: &str,
107    ) -> (common_test_util::temp_dir::TempDir, Self) {
108        let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
109        let factory = Self::new(tempdir.path().to_path_buf(), 1024, None, None)
110            .await
111            .unwrap();
112        (tempdir, factory)
113    }
114
115    pub(crate) fn new_for_test_block(prefix: &str) -> (common_test_util::temp_dir::TempDir, Self) {
116        let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
117
118        let f = Self::new(tempdir.path().to_path_buf(), 1024, None, None);
119        let factory = common_runtime::block_on_global(f).unwrap();
120
121        (tempdir, factory)
122    }
123}
124
125/// A `PuffinFileAccessor` implementation that uses an object store as the underlying storage.
126#[derive(Clone)]
127pub(crate) struct ObjectStorePuffinFileAccessor {
128    object_store: InstrumentedStore,
129    path_provider: Arc<dyn FilePathProvider>,
130}
131
132impl ObjectStorePuffinFileAccessor {
133    pub fn new(object_store: InstrumentedStore, path_provider: Arc<dyn FilePathProvider>) -> Self {
134        Self {
135            object_store,
136            path_provider,
137        }
138    }
139}
140
141#[async_trait]
142impl PuffinFileAccessor for ObjectStorePuffinFileAccessor {
143    type Reader = InstrumentedRangeReader;
144    type Writer = InstrumentedAsyncWrite;
145    type FileHandle = FileId;
146
147    async fn reader(&self, handle: &FileId) -> PuffinResult<Self::Reader> {
148        let file_path = self.path_provider.build_index_file_path(*handle);
149        self.object_store
150            .range_reader(
151                &file_path,
152                &INDEX_PUFFIN_READ_BYTES_TOTAL,
153                &INDEX_PUFFIN_READ_OP_TOTAL,
154            )
155            .await
156            .map_err(BoxedError::new)
157            .context(puffin_error::ExternalSnafu)
158    }
159
160    async fn writer(&self, handle: &FileId) -> PuffinResult<Self::Writer> {
161        let file_path = self.path_provider.build_index_file_path(*handle);
162        self.object_store
163            .writer(
164                &file_path,
165                &INDEX_PUFFIN_WRITE_BYTES_TOTAL,
166                &INDEX_PUFFIN_WRITE_OP_TOTAL,
167                &INDEX_PUFFIN_FLUSH_OP_TOTAL,
168            )
169            .await
170            .map_err(BoxedError::new)
171            .context(puffin_error::ExternalSnafu)
172    }
173}
174
175#[cfg(test)]
176mod tests {
177
178    use common_base::range_read::RangeReader;
179    use common_test_util::temp_dir::create_temp_dir;
180    use futures::io::Cursor;
181    use object_store::services::Memory;
182    use puffin::blob_metadata::CompressionCodec;
183    use puffin::puffin_manager::{PuffinManager, PuffinReader, PuffinWriter, PutOptions};
184
185    use super::*;
186
187    struct TestFilePathProvider;
188
189    impl FilePathProvider for TestFilePathProvider {
190        fn build_index_file_path(&self, file_id: FileId) -> String {
191            file_id.to_string()
192        }
193
194        fn build_sst_file_path(&self, file_id: FileId) -> String {
195            file_id.to_string()
196        }
197    }
198
199    #[tokio::test]
200    async fn test_puffin_manager_factory() {
201        let (_dir, factory) =
202            PuffinManagerFactory::new_for_test_async("test_puffin_manager_factory_").await;
203
204        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
205        let manager = factory.build(object_store, TestFilePathProvider);
206
207        let file_id = FileId::random();
208        let blob_key = "blob-key";
209        let dir_key = "dir-key";
210        let raw_data = b"hello world!";
211
212        let mut writer = manager.writer(&file_id).await.unwrap();
213        writer
214            .put_blob(
215                blob_key,
216                Cursor::new(raw_data),
217                PutOptions::default(),
218                Default::default(),
219            )
220            .await
221            .unwrap();
222        let dir_data = create_temp_dir("test_puffin_manager_factory_dir_data_");
223        tokio::fs::write(dir_data.path().join("hello"), raw_data)
224            .await
225            .unwrap();
226        writer
227            .put_dir(
228                dir_key,
229                dir_data.path().into(),
230                PutOptions {
231                    compression: Some(CompressionCodec::Zstd),
232                },
233                Default::default(),
234            )
235            .await
236            .unwrap();
237        writer.finish().await.unwrap();
238
239        let reader = manager.reader(&file_id).await.unwrap();
240        let blob_guard = reader.blob(blob_key).await.unwrap();
241        let blob_reader = blob_guard.reader().await.unwrap();
242        let meta = blob_reader.metadata().await.unwrap();
243        let bs = blob_reader.read(0..meta.content_length).await.unwrap();
244        assert_eq!(&*bs, raw_data);
245
246        let dir_guard = reader.dir(dir_key).await.unwrap();
247        let file = dir_guard.path().join("hello");
248        let data = tokio::fs::read(file).await.unwrap();
249        assert_eq!(data, raw_data);
250    }
251}