mito2/
access_layer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use object_store::services::Fs;
18use object_store::util::{join_dir, with_instrument_layers};
19use object_store::ObjectStore;
20use smallvec::SmallVec;
21use snafu::ResultExt;
22use store_api::metadata::RegionMetadataRef;
23use store_api::storage::{RegionId, SequenceNumber};
24
25use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
26use crate::cache::write_cache::SstUploadRequest;
27use crate::cache::CacheManagerRef;
28use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
29use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
30use crate::read::Source;
31use crate::region::options::IndexOptions;
32use crate::sst::file::{FileHandle, FileId, FileMeta};
33use crate::sst::index::intermediate::IntermediateManager;
34use crate::sst::index::puffin_manager::PuffinManagerFactory;
35use crate::sst::index::IndexerBuilderImpl;
36use crate::sst::location;
37use crate::sst::parquet::reader::ParquetReaderBuilder;
38use crate::sst::parquet::writer::ParquetWriter;
39use crate::sst::parquet::{SstInfo, WriteOptions};
40
41pub type AccessLayerRef = Arc<AccessLayer>;
42/// SST write results.
43pub type SstInfoArray = SmallVec<[SstInfo; 2]>;
44
45/// A layer to access SST files under the same directory.
46pub struct AccessLayer {
47    region_dir: String,
48    /// Target object store.
49    object_store: ObjectStore,
50    /// Puffin manager factory for index.
51    puffin_manager_factory: PuffinManagerFactory,
52    /// Intermediate manager for inverted index.
53    intermediate_manager: IntermediateManager,
54}
55
56impl std::fmt::Debug for AccessLayer {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct("AccessLayer")
59            .field("region_dir", &self.region_dir)
60            .finish()
61    }
62}
63
64impl AccessLayer {
65    /// Returns a new [AccessLayer] for specific `region_dir`.
66    pub fn new(
67        region_dir: impl Into<String>,
68        object_store: ObjectStore,
69        puffin_manager_factory: PuffinManagerFactory,
70        intermediate_manager: IntermediateManager,
71    ) -> AccessLayer {
72        AccessLayer {
73            region_dir: region_dir.into(),
74            object_store,
75            puffin_manager_factory,
76            intermediate_manager,
77        }
78    }
79
80    /// Returns the directory of the region.
81    pub fn region_dir(&self) -> &str {
82        &self.region_dir
83    }
84
85    /// Returns the object store of the layer.
86    pub fn object_store(&self) -> &ObjectStore {
87        &self.object_store
88    }
89
90    /// Returns the puffin manager factory.
91    pub fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
92        &self.puffin_manager_factory
93    }
94
95    /// Deletes a SST file (and its index file if it has one) with given file id.
96    pub(crate) async fn delete_sst(&self, file_meta: &FileMeta) -> Result<()> {
97        let path = location::sst_file_path(&self.region_dir, file_meta.file_id);
98        self.object_store
99            .delete(&path)
100            .await
101            .context(DeleteSstSnafu {
102                file_id: file_meta.file_id,
103            })?;
104
105        let path = location::index_file_path(&self.region_dir, file_meta.file_id);
106        self.object_store
107            .delete(&path)
108            .await
109            .context(DeleteIndexSnafu {
110                file_id: file_meta.file_id,
111            })?;
112
113        Ok(())
114    }
115
116    /// Returns a reader builder for specific `file`.
117    pub(crate) fn read_sst(&self, file: FileHandle) -> ParquetReaderBuilder {
118        ParquetReaderBuilder::new(self.region_dir.clone(), file, self.object_store.clone())
119    }
120
121    /// Writes a SST with specific `file_id` and `metadata` to the layer.
122    ///
123    /// Returns the info of the SST. If no data written, returns None.
124    pub async fn write_sst(
125        &self,
126        request: SstWriteRequest,
127        write_opts: &WriteOptions,
128    ) -> Result<SstInfoArray> {
129        let region_id = request.metadata.region_id;
130        let cache_manager = request.cache_manager.clone();
131
132        let sst_info = if let Some(write_cache) = cache_manager.write_cache() {
133            // Write to the write cache.
134            write_cache
135                .write_and_upload_sst(
136                    request,
137                    SstUploadRequest {
138                        dest_path_provider: RegionFilePathFactory {
139                            region_dir: self.region_dir.clone(),
140                        },
141                        remote_store: self.object_store.clone(),
142                    },
143                    write_opts,
144                )
145                .await?
146        } else {
147            // Write cache is disabled.
148            let store = self.object_store.clone();
149            let path_provider = RegionFilePathFactory::new(self.region_dir.clone());
150            let indexer_builder = IndexerBuilderImpl {
151                op_type: request.op_type,
152                metadata: request.metadata.clone(),
153                row_group_size: write_opts.row_group_size,
154                puffin_manager: self
155                    .puffin_manager_factory
156                    .build(store, path_provider.clone()),
157                intermediate_manager: self.intermediate_manager.clone(),
158                index_options: request.index_options,
159                inverted_index_config: request.inverted_index_config,
160                fulltext_index_config: request.fulltext_index_config,
161                bloom_filter_index_config: request.bloom_filter_index_config,
162            };
163            let mut writer = ParquetWriter::new_with_object_store(
164                self.object_store.clone(),
165                request.metadata,
166                indexer_builder,
167                path_provider,
168            )
169            .await;
170            writer
171                .write_all(request.source, request.max_sequence, write_opts)
172                .await?
173        };
174
175        // Put parquet metadata to cache manager.
176        if !sst_info.is_empty() {
177            for sst in &sst_info {
178                if let Some(parquet_metadata) = &sst.file_metadata {
179                    cache_manager.put_parquet_meta_data(
180                        region_id,
181                        sst.file_id,
182                        parquet_metadata.clone(),
183                    )
184                }
185            }
186        }
187
188        Ok(sst_info)
189    }
190}
191
192/// `OperationType` represents the origin of the `SstWriteRequest`.
193#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
194pub enum OperationType {
195    Flush,
196    Compact,
197}
198
199/// Contents to build a SST.
200pub struct SstWriteRequest {
201    pub op_type: OperationType,
202    pub metadata: RegionMetadataRef,
203    pub source: Source,
204    pub cache_manager: CacheManagerRef,
205    #[allow(dead_code)]
206    pub storage: Option<String>,
207    pub max_sequence: Option<SequenceNumber>,
208
209    /// Configs for index
210    pub index_options: IndexOptions,
211    pub inverted_index_config: InvertedIndexConfig,
212    pub fulltext_index_config: FulltextIndexConfig,
213    pub bloom_filter_index_config: BloomFilterConfig,
214}
215
216pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
217    let atomic_write_dir = join_dir(root, ".tmp/");
218    clean_dir(&atomic_write_dir).await?;
219
220    let builder = Fs::default().root(root).atomic_write_dir(&atomic_write_dir);
221    let store = ObjectStore::new(builder).context(OpenDalSnafu)?.finish();
222
223    Ok(with_instrument_layers(store, false))
224}
225
226/// Clean the directory.
227async fn clean_dir(dir: &str) -> Result<()> {
228    if tokio::fs::try_exists(dir)
229        .await
230        .context(CleanDirSnafu { dir })?
231    {
232        tokio::fs::remove_dir_all(dir)
233            .await
234            .context(CleanDirSnafu { dir })?;
235    }
236
237    Ok(())
238}
239
240/// Path provider for SST file and index file.
241pub trait FilePathProvider: Send + Sync {
242    /// Creates index file path of given file id.
243    fn build_index_file_path(&self, file_id: FileId) -> String;
244
245    /// Creates SST file path of given file id.
246    fn build_sst_file_path(&self, file_id: FileId) -> String;
247}
248
249/// Path provider that builds paths in local write cache.
250#[derive(Clone)]
251pub(crate) struct WriteCachePathProvider {
252    region_id: RegionId,
253    file_cache: FileCacheRef,
254}
255
256impl WriteCachePathProvider {
257    /// Creates a new `WriteCachePathProvider` instance.
258    pub fn new(region_id: RegionId, file_cache: FileCacheRef) -> Self {
259        Self {
260            region_id,
261            file_cache,
262        }
263    }
264}
265
266impl FilePathProvider for WriteCachePathProvider {
267    fn build_index_file_path(&self, file_id: FileId) -> String {
268        let puffin_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
269        self.file_cache.cache_file_path(puffin_key)
270    }
271
272    fn build_sst_file_path(&self, file_id: FileId) -> String {
273        let parquet_file_key = IndexKey::new(self.region_id, file_id, FileType::Parquet);
274        self.file_cache.cache_file_path(parquet_file_key)
275    }
276}
277
278/// Path provider that builds paths in region storage path.
279#[derive(Clone, Debug)]
280pub(crate) struct RegionFilePathFactory {
281    region_dir: String,
282}
283
284impl RegionFilePathFactory {
285    /// Creates a new `RegionFilePathFactory` instance.
286    pub fn new(region_dir: String) -> Self {
287        Self { region_dir }
288    }
289}
290
291impl FilePathProvider for RegionFilePathFactory {
292    fn build_index_file_path(&self, file_id: FileId) -> String {
293        location::index_file_path(&self.region_dir, file_id)
294    }
295
296    fn build_sst_file_path(&self, file_id: FileId) -> String {
297        location::sst_file_path(&self.region_dir, file_id)
298    }
299}