mito2/sst/index/
puffin_manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::Path;
16use std::sync::Arc;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use common_error::ext::BoxedError;
21use object_store::{FuturesAsyncWriter, ObjectStore};
22use puffin::error::{self as puffin_error, Result as PuffinResult};
23use puffin::puffin_manager::file_accessor::PuffinFileAccessor;
24use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager;
25use puffin::puffin_manager::stager::{BoundedStager, Stager};
26use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
27use snafu::ResultExt;
28
29use crate::access_layer::FilePathProvider;
30use crate::error::{PuffinInitStagerSnafu, PuffinPurgeStagerSnafu, Result};
31use crate::metrics::{
32    INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_READ_BYTES_TOTAL, INDEX_PUFFIN_READ_OP_TOTAL,
33    INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL, StagerMetrics,
34};
35use crate::sst::file::RegionIndexId;
36use crate::sst::index::store::{self, InstrumentedStore};
37
38type InstrumentedRangeReader = store::InstrumentedRangeReader<'static>;
39type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>;
40
41pub(crate) type SstPuffinManager =
42    FsPuffinManager<Arc<BoundedStager<RegionIndexId>>, ObjectStorePuffinFileAccessor>;
43pub(crate) type SstPuffinReader = <SstPuffinManager as PuffinManager>::Reader;
44pub(crate) type SstPuffinWriter = <SstPuffinManager as PuffinManager>::Writer;
45pub(crate) type SstPuffinBlob = <SstPuffinReader as PuffinReader>::Blob;
46pub(crate) type SstPuffinDir = <SstPuffinReader as PuffinReader>::Dir;
47pub(crate) type BlobReader = <SstPuffinBlob as BlobGuard>::Reader;
48
49const STAGING_DIR: &str = "staging";
50
51/// A factory for creating `SstPuffinManager` instances.
52#[derive(Clone)]
53pub struct PuffinManagerFactory {
54    /// The stager used by the puffin manager.
55    stager: Arc<BoundedStager<RegionIndexId>>,
56
57    /// The size of the write buffer used to create object store.
58    write_buffer_size: Option<usize>,
59}
60
61impl PuffinManagerFactory {
62    /// Creates a new `PuffinManagerFactory` instance.
63    pub async fn new(
64        aux_path: impl AsRef<Path>,
65        staging_capacity: u64,
66        write_buffer_size: Option<usize>,
67        staging_ttl: Option<Duration>,
68    ) -> Result<Self> {
69        let staging_dir = aux_path.as_ref().join(STAGING_DIR);
70        let stager = BoundedStager::new(
71            staging_dir,
72            staging_capacity,
73            Some(Arc::new(StagerMetrics::default())),
74            staging_ttl,
75        )
76        .await
77        .context(PuffinInitStagerSnafu)?;
78        Ok(Self {
79            stager: Arc::new(stager),
80            write_buffer_size,
81        })
82    }
83
84    pub(crate) fn build(
85        &self,
86        store: ObjectStore,
87        path_provider: impl FilePathProvider + 'static,
88    ) -> SstPuffinManager {
89        let store = InstrumentedStore::new(store).with_write_buffer_size(self.write_buffer_size);
90        let puffin_file_accessor =
91            ObjectStorePuffinFileAccessor::new(store, Arc::new(path_provider));
92        SstPuffinManager::new(self.stager.clone(), puffin_file_accessor)
93    }
94
95    pub(crate) async fn purge_stager(&self, file_id: RegionIndexId) -> Result<()> {
96        self.stager
97            .purge(&file_id)
98            .await
99            .context(PuffinPurgeStagerSnafu)
100    }
101}
102
103#[cfg(test)]
104impl PuffinManagerFactory {
105    pub(crate) async fn new_for_test_async(
106        prefix: &str,
107    ) -> (common_test_util::temp_dir::TempDir, Self) {
108        let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
109        let factory = Self::new(tempdir.path().to_path_buf(), 1024, None, None)
110            .await
111            .unwrap();
112        (tempdir, factory)
113    }
114
115    pub(crate) fn new_for_test_block(prefix: &str) -> (common_test_util::temp_dir::TempDir, Self) {
116        let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
117
118        let f = Self::new(tempdir.path().to_path_buf(), 1024, None, None);
119        let factory = common_runtime::block_on_global(f).unwrap();
120
121        (tempdir, factory)
122    }
123}
124
125/// A `PuffinFileAccessor` implementation that uses an object store as the underlying storage.
126#[derive(Clone)]
127pub(crate) struct ObjectStorePuffinFileAccessor {
128    object_store: InstrumentedStore,
129    path_provider: Arc<dyn FilePathProvider>,
130}
131
132impl ObjectStorePuffinFileAccessor {
133    pub fn new(object_store: InstrumentedStore, path_provider: Arc<dyn FilePathProvider>) -> Self {
134        Self {
135            object_store,
136            path_provider,
137        }
138    }
139
140    pub fn store(&self) -> &InstrumentedStore {
141        &self.object_store
142    }
143}
144
145#[async_trait]
146impl PuffinFileAccessor for ObjectStorePuffinFileAccessor {
147    type Reader = InstrumentedRangeReader;
148    type Writer = InstrumentedAsyncWrite;
149    type FileHandle = RegionIndexId;
150
151    async fn reader(&self, handle: &RegionIndexId) -> PuffinResult<Self::Reader> {
152        let file_path = self
153            .path_provider
154            .build_index_file_path_with_version(*handle);
155        self.object_store
156            .range_reader(
157                &file_path,
158                &INDEX_PUFFIN_READ_BYTES_TOTAL,
159                &INDEX_PUFFIN_READ_OP_TOTAL,
160            )
161            .await
162            .map_err(BoxedError::new)
163            .context(puffin_error::ExternalSnafu)
164    }
165
166    async fn writer(&self, handle: &RegionIndexId) -> PuffinResult<Self::Writer> {
167        let file_path = self
168            .path_provider
169            .build_index_file_path_with_version(*handle);
170        self.object_store
171            .writer(
172                &file_path,
173                &INDEX_PUFFIN_WRITE_BYTES_TOTAL,
174                &INDEX_PUFFIN_WRITE_OP_TOTAL,
175                &INDEX_PUFFIN_FLUSH_OP_TOTAL,
176            )
177            .await
178            .map_err(BoxedError::new)
179            .context(puffin_error::ExternalSnafu)
180    }
181}
182
183#[cfg(test)]
184mod tests {
185
186    use common_base::range_read::RangeReader;
187    use common_test_util::temp_dir::create_temp_dir;
188    use futures::io::Cursor;
189    use object_store::services::Memory;
190    use puffin::blob_metadata::CompressionCodec;
191    use puffin::puffin_manager::{PuffinManager, PuffinReader, PuffinWriter, PutOptions};
192    use store_api::storage::FileId;
193
194    use super::*;
195    use crate::sst::file::{RegionFileId, RegionIndexId};
196
197    struct TestFilePathProvider;
198
199    impl FilePathProvider for TestFilePathProvider {
200        fn build_index_file_path(&self, file_id: RegionFileId) -> String {
201            file_id.file_id().to_string()
202        }
203
204        fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
205            index_id.file_id.file_id().to_string()
206        }
207
208        fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
209            file_id.file_id().to_string()
210        }
211    }
212
213    #[tokio::test]
214    async fn test_puffin_manager_factory() {
215        let (_dir, factory) =
216            PuffinManagerFactory::new_for_test_async("test_puffin_manager_factory_").await;
217
218        let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
219        let manager = factory.build(object_store, TestFilePathProvider);
220
221        let file_id = RegionIndexId::new(RegionFileId::new(0.into(), FileId::random()), 0);
222        let blob_key = "blob-key";
223        let dir_key = "dir-key";
224        let raw_data = b"hello world!";
225
226        let mut writer = manager.writer(&file_id).await.unwrap();
227        writer
228            .put_blob(
229                blob_key,
230                Cursor::new(raw_data),
231                PutOptions::default(),
232                Default::default(),
233            )
234            .await
235            .unwrap();
236        let dir_data = create_temp_dir("test_puffin_manager_factory_dir_data_");
237        tokio::fs::write(dir_data.path().join("hello"), raw_data)
238            .await
239            .unwrap();
240        writer
241            .put_dir(
242                dir_key,
243                dir_data.path().into(),
244                PutOptions {
245                    compression: Some(CompressionCodec::Zstd),
246                },
247                Default::default(),
248            )
249            .await
250            .unwrap();
251        writer.finish().await.unwrap();
252
253        let reader = manager.reader(&file_id).await.unwrap();
254        let blob_guard = reader.blob(blob_key).await.unwrap();
255        let blob_reader = blob_guard.reader().await.unwrap();
256        let meta = blob_reader.metadata().await.unwrap();
257        let bs = blob_reader.read(0..meta.content_length).await.unwrap();
258        assert_eq!(&*bs, raw_data);
259
260        let (dir_guard, _metrics) = reader.dir(dir_key).await.unwrap();
261        let file = dir_guard.path().join("hello");
262        let data = tokio::fs::read(file).await.unwrap();
263        assert_eq!(data, raw_data);
264    }
265}