mito2/
access_layer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use object_store::services::Fs;
18use object_store::util::{join_dir, with_instrument_layers};
19use object_store::{ErrorKind, ObjectStore};
20use smallvec::SmallVec;
21use snafu::ResultExt;
22use store_api::metadata::RegionMetadataRef;
23use store_api::storage::{RegionId, SequenceNumber};
24
25use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
26use crate::cache::write_cache::SstUploadRequest;
27use crate::cache::CacheManagerRef;
28use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
29use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
30use crate::read::Source;
31use crate::region::options::IndexOptions;
32use crate::sst::file::{FileHandle, FileId, FileMeta};
33use crate::sst::index::intermediate::IntermediateManager;
34use crate::sst::index::puffin_manager::PuffinManagerFactory;
35use crate::sst::index::IndexerBuilderImpl;
36use crate::sst::location;
37use crate::sst::parquet::reader::ParquetReaderBuilder;
38use crate::sst::parquet::writer::ParquetWriter;
39use crate::sst::parquet::{SstInfo, WriteOptions};
40
41pub type AccessLayerRef = Arc<AccessLayer>;
42/// SST write results.
43pub type SstInfoArray = SmallVec<[SstInfo; 2]>;
44
45pub const ATOMIC_WRITE_DIR: &str = "tmp/";
46/// For compatibility. Remove this after a major version release.
47pub const OLD_ATOMIC_WRITE_DIR: &str = ".tmp/";
48
49/// A layer to access SST files under the same directory.
50pub struct AccessLayer {
51    region_dir: String,
52    /// Target object store.
53    object_store: ObjectStore,
54    /// Puffin manager factory for index.
55    puffin_manager_factory: PuffinManagerFactory,
56    /// Intermediate manager for inverted index.
57    intermediate_manager: IntermediateManager,
58}
59
60impl std::fmt::Debug for AccessLayer {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        f.debug_struct("AccessLayer")
63            .field("region_dir", &self.region_dir)
64            .finish()
65    }
66}
67
68impl AccessLayer {
69    /// Returns a new [AccessLayer] for specific `region_dir`.
70    pub fn new(
71        region_dir: impl Into<String>,
72        object_store: ObjectStore,
73        puffin_manager_factory: PuffinManagerFactory,
74        intermediate_manager: IntermediateManager,
75    ) -> AccessLayer {
76        AccessLayer {
77            region_dir: region_dir.into(),
78            object_store,
79            puffin_manager_factory,
80            intermediate_manager,
81        }
82    }
83
84    /// Returns the directory of the region.
85    pub fn region_dir(&self) -> &str {
86        &self.region_dir
87    }
88
89    /// Returns the object store of the layer.
90    pub fn object_store(&self) -> &ObjectStore {
91        &self.object_store
92    }
93
94    /// Returns the puffin manager factory.
95    pub fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
96        &self.puffin_manager_factory
97    }
98
99    /// Deletes a SST file (and its index file if it has one) with given file id.
100    pub(crate) async fn delete_sst(&self, file_meta: &FileMeta) -> Result<()> {
101        let path = location::sst_file_path(&self.region_dir, file_meta.file_id);
102        self.object_store
103            .delete(&path)
104            .await
105            .context(DeleteSstSnafu {
106                file_id: file_meta.file_id,
107            })?;
108
109        let path = location::index_file_path(&self.region_dir, file_meta.file_id);
110        self.object_store
111            .delete(&path)
112            .await
113            .context(DeleteIndexSnafu {
114                file_id: file_meta.file_id,
115            })?;
116
117        Ok(())
118    }
119
120    /// Returns a reader builder for specific `file`.
121    pub(crate) fn read_sst(&self, file: FileHandle) -> ParquetReaderBuilder {
122        ParquetReaderBuilder::new(self.region_dir.clone(), file, self.object_store.clone())
123    }
124
125    /// Writes a SST with specific `file_id` and `metadata` to the layer.
126    ///
127    /// Returns the info of the SST. If no data written, returns None.
128    pub async fn write_sst(
129        &self,
130        request: SstWriteRequest,
131        write_opts: &WriteOptions,
132    ) -> Result<SstInfoArray> {
133        let region_id = request.metadata.region_id;
134        let cache_manager = request.cache_manager.clone();
135
136        let sst_info = if let Some(write_cache) = cache_manager.write_cache() {
137            // Write to the write cache.
138            write_cache
139                .write_and_upload_sst(
140                    request,
141                    SstUploadRequest {
142                        dest_path_provider: RegionFilePathFactory {
143                            region_dir: self.region_dir.clone(),
144                        },
145                        remote_store: self.object_store.clone(),
146                    },
147                    write_opts,
148                )
149                .await?
150        } else {
151            // Write cache is disabled.
152            let store = self.object_store.clone();
153            let path_provider = RegionFilePathFactory::new(self.region_dir.clone());
154            let indexer_builder = IndexerBuilderImpl {
155                op_type: request.op_type,
156                metadata: request.metadata.clone(),
157                row_group_size: write_opts.row_group_size,
158                puffin_manager: self
159                    .puffin_manager_factory
160                    .build(store, path_provider.clone()),
161                intermediate_manager: self.intermediate_manager.clone(),
162                index_options: request.index_options,
163                inverted_index_config: request.inverted_index_config,
164                fulltext_index_config: request.fulltext_index_config,
165                bloom_filter_index_config: request.bloom_filter_index_config,
166            };
167            // We disable write cache on file system but we still use atomic write.
168            // TODO(yingwen): If we support other non-fs stores without the write cache, then
169            // we may have find a way to check whether we need the cleaner.
170            let cleaner = TempFileCleaner::new(region_id, self.object_store.clone());
171            let mut writer = ParquetWriter::new_with_object_store(
172                self.object_store.clone(),
173                request.metadata,
174                indexer_builder,
175                path_provider,
176            )
177            .await
178            .with_file_cleaner(cleaner);
179            writer
180                .write_all(request.source, request.max_sequence, write_opts)
181                .await?
182        };
183
184        // Put parquet metadata to cache manager.
185        if !sst_info.is_empty() {
186            for sst in &sst_info {
187                if let Some(parquet_metadata) = &sst.file_metadata {
188                    cache_manager.put_parquet_meta_data(
189                        region_id,
190                        sst.file_id,
191                        parquet_metadata.clone(),
192                    )
193                }
194            }
195        }
196
197        Ok(sst_info)
198    }
199}
200
201/// `OperationType` represents the origin of the `SstWriteRequest`.
202#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
203pub enum OperationType {
204    Flush,
205    Compact,
206}
207
208/// Contents to build a SST.
209pub struct SstWriteRequest {
210    pub op_type: OperationType,
211    pub metadata: RegionMetadataRef,
212    pub source: Source,
213    pub cache_manager: CacheManagerRef,
214    #[allow(dead_code)]
215    pub storage: Option<String>,
216    pub max_sequence: Option<SequenceNumber>,
217
218    /// Configs for index
219    pub index_options: IndexOptions,
220    pub inverted_index_config: InvertedIndexConfig,
221    pub fulltext_index_config: FulltextIndexConfig,
222    pub bloom_filter_index_config: BloomFilterConfig,
223}
224
225/// Cleaner to remove temp files on the atomic write dir.
226pub(crate) struct TempFileCleaner {
227    region_id: RegionId,
228    object_store: ObjectStore,
229}
230
231impl TempFileCleaner {
232    /// Constructs the cleaner for the region and store.
233    pub(crate) fn new(region_id: RegionId, object_store: ObjectStore) -> Self {
234        Self {
235            region_id,
236            object_store,
237        }
238    }
239
240    /// Removes the SST and index file from the local atomic dir by the file id.
241    pub(crate) async fn clean_by_file_id(&self, file_id: FileId) {
242        let sst_key = IndexKey::new(self.region_id, file_id, FileType::Parquet).to_string();
243        let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin).to_string();
244
245        Self::clean_atomic_dir_files(&self.object_store, &[&sst_key, &index_key]).await;
246    }
247
248    /// Removes the files from the local atomic dir by their names.
249    pub(crate) async fn clean_atomic_dir_files(
250        local_store: &ObjectStore,
251        names_to_remove: &[&str],
252    ) {
253        // We don't know the actual suffix of the file under atomic dir, so we have
254        // to list the dir. The cost should be acceptable as there won't be to many files.
255        let Ok(entries) = local_store.list(ATOMIC_WRITE_DIR).await.inspect_err(|e| {
256            if e.kind() != ErrorKind::NotFound {
257                common_telemetry::error!(e; "Failed to list tmp files for {:?}", names_to_remove)
258            }
259        }) else {
260            return;
261        };
262
263        // In our case, we can ensure the file id is unique so it is safe to remove all files
264        // with the same file id under the atomic write dir.
265        let actual_files: Vec<_> = entries
266            .into_iter()
267            .filter_map(|entry| {
268                if entry.metadata().is_dir() {
269                    return None;
270                }
271
272                // Remove name that matches files_to_remove.
273                let should_remove = names_to_remove
274                    .iter()
275                    .any(|file| entry.name().starts_with(file));
276                if should_remove {
277                    Some(entry.path().to_string())
278                } else {
279                    None
280                }
281            })
282            .collect();
283
284        common_telemetry::warn!(
285            "Clean files {:?} under atomic write dir for {:?}",
286            actual_files,
287            names_to_remove
288        );
289
290        if let Err(e) = local_store.delete_iter(actual_files).await {
291            common_telemetry::error!(e; "Failed to delete tmp file for {:?}", names_to_remove);
292        }
293    }
294}
295
296pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
297    let atomic_write_dir = join_dir(root, ATOMIC_WRITE_DIR);
298    clean_dir(&atomic_write_dir).await?;
299
300    // Compatible code. Remove this after a major release.
301    let old_atomic_temp_dir = join_dir(root, OLD_ATOMIC_WRITE_DIR);
302    clean_dir(&old_atomic_temp_dir).await?;
303
304    let builder = Fs::default().root(root).atomic_write_dir(&atomic_write_dir);
305    let store = ObjectStore::new(builder).context(OpenDalSnafu)?.finish();
306
307    Ok(with_instrument_layers(store, false))
308}
309
310/// Clean the directory.
311async fn clean_dir(dir: &str) -> Result<()> {
312    if tokio::fs::try_exists(dir)
313        .await
314        .context(CleanDirSnafu { dir })?
315    {
316        tokio::fs::remove_dir_all(dir)
317            .await
318            .context(CleanDirSnafu { dir })?;
319    }
320
321    Ok(())
322}
323
324/// Path provider for SST file and index file.
325pub trait FilePathProvider: Send + Sync {
326    /// Creates index file path of given file id.
327    fn build_index_file_path(&self, file_id: FileId) -> String;
328
329    /// Creates SST file path of given file id.
330    fn build_sst_file_path(&self, file_id: FileId) -> String;
331}
332
333/// Path provider that builds paths in local write cache.
334#[derive(Clone)]
335pub(crate) struct WriteCachePathProvider {
336    region_id: RegionId,
337    file_cache: FileCacheRef,
338}
339
340impl WriteCachePathProvider {
341    /// Creates a new `WriteCachePathProvider` instance.
342    pub fn new(region_id: RegionId, file_cache: FileCacheRef) -> Self {
343        Self {
344            region_id,
345            file_cache,
346        }
347    }
348}
349
350impl FilePathProvider for WriteCachePathProvider {
351    fn build_index_file_path(&self, file_id: FileId) -> String {
352        let puffin_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
353        self.file_cache.cache_file_path(puffin_key)
354    }
355
356    fn build_sst_file_path(&self, file_id: FileId) -> String {
357        let parquet_file_key = IndexKey::new(self.region_id, file_id, FileType::Parquet);
358        self.file_cache.cache_file_path(parquet_file_key)
359    }
360}
361
362/// Path provider that builds paths in region storage path.
363#[derive(Clone, Debug)]
364pub(crate) struct RegionFilePathFactory {
365    pub(crate) region_dir: String,
366}
367
368impl RegionFilePathFactory {
369    /// Creates a new `RegionFilePathFactory` instance.
370    pub fn new(region_dir: String) -> Self {
371        Self { region_dir }
372    }
373}
374
375impl FilePathProvider for RegionFilePathFactory {
376    fn build_index_file_path(&self, file_id: FileId) -> String {
377        location::index_file_path(&self.region_dir, file_id)
378    }
379
380    fn build_sst_file_path(&self, file_id: FileId) -> String {
381        location::sst_file_path(&self.region_dir, file_id)
382    }
383}