mito2/
access_layer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use object_store::services::Fs;
19use object_store::util::{join_dir, with_instrument_layers};
20use object_store::{ErrorKind, ObjectStore, ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR};
21use smallvec::SmallVec;
22use snafu::ResultExt;
23use store_api::metadata::RegionMetadataRef;
24use store_api::region_request::PathType;
25use store_api::storage::{RegionId, SequenceNumber};
26
27use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
28use crate::cache::write_cache::SstUploadRequest;
29use crate::cache::CacheManagerRef;
30use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
31use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
32use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED};
33use crate::read::Source;
34use crate::region::options::IndexOptions;
35use crate::sst::file::{FileHandle, FileId, FileMeta, RegionFileId};
36use crate::sst::index::intermediate::IntermediateManager;
37use crate::sst::index::puffin_manager::PuffinManagerFactory;
38use crate::sst::index::IndexerBuilderImpl;
39use crate::sst::location::{self, region_dir_from_table_dir};
40use crate::sst::parquet::reader::ParquetReaderBuilder;
41use crate::sst::parquet::writer::ParquetWriter;
42use crate::sst::parquet::{SstInfo, WriteOptions};
43
44pub type AccessLayerRef = Arc<AccessLayer>;
45/// SST write results.
46pub type SstInfoArray = SmallVec<[SstInfo; 2]>;
47
48/// Write operation type.
49#[derive(Eq, PartialEq, Debug)]
50pub enum WriteType {
51    /// Writes from flush
52    Flush,
53    /// Writes from compaction.
54    Compaction,
55}
56
57#[derive(Debug)]
58pub struct Metrics {
59    pub(crate) write_type: WriteType,
60    pub(crate) iter_source: Duration,
61    pub(crate) write_batch: Duration,
62    pub(crate) update_index: Duration,
63    pub(crate) upload_parquet: Duration,
64    pub(crate) upload_puffin: Duration,
65}
66
67impl Metrics {
68    pub(crate) fn new(write_type: WriteType) -> Self {
69        Self {
70            write_type,
71            iter_source: Default::default(),
72            write_batch: Default::default(),
73            update_index: Default::default(),
74            upload_parquet: Default::default(),
75            upload_puffin: Default::default(),
76        }
77    }
78
79    pub(crate) fn merge(mut self, other: Self) -> Self {
80        assert_eq!(self.write_type, other.write_type);
81        self.iter_source += other.iter_source;
82        self.write_batch += other.write_batch;
83        self.update_index += other.update_index;
84        self.upload_parquet += other.upload_parquet;
85        self.upload_puffin += other.upload_puffin;
86        self
87    }
88
89    pub(crate) fn observe(self) {
90        match self.write_type {
91            WriteType::Flush => {
92                FLUSH_ELAPSED
93                    .with_label_values(&["iter_source"])
94                    .observe(self.iter_source.as_secs_f64());
95                FLUSH_ELAPSED
96                    .with_label_values(&["write_batch"])
97                    .observe(self.write_batch.as_secs_f64());
98                FLUSH_ELAPSED
99                    .with_label_values(&["update_index"])
100                    .observe(self.update_index.as_secs_f64());
101                FLUSH_ELAPSED
102                    .with_label_values(&["upload_parquet"])
103                    .observe(self.upload_parquet.as_secs_f64());
104                FLUSH_ELAPSED
105                    .with_label_values(&["upload_puffin"])
106                    .observe(self.upload_puffin.as_secs_f64());
107            }
108            WriteType::Compaction => {
109                COMPACTION_STAGE_ELAPSED
110                    .with_label_values(&["iter_source"])
111                    .observe(self.iter_source.as_secs_f64());
112                COMPACTION_STAGE_ELAPSED
113                    .with_label_values(&["write_batch"])
114                    .observe(self.write_batch.as_secs_f64());
115                COMPACTION_STAGE_ELAPSED
116                    .with_label_values(&["update_index"])
117                    .observe(self.update_index.as_secs_f64());
118                COMPACTION_STAGE_ELAPSED
119                    .with_label_values(&["upload_parquet"])
120                    .observe(self.upload_parquet.as_secs_f64());
121                COMPACTION_STAGE_ELAPSED
122                    .with_label_values(&["upload_puffin"])
123                    .observe(self.upload_puffin.as_secs_f64());
124            }
125        };
126    }
127}
128
129/// A layer to access SST files under the same directory.
130pub struct AccessLayer {
131    table_dir: String,
132    /// Path type for generating file paths.
133    path_type: PathType,
134    /// Target object store.
135    object_store: ObjectStore,
136    /// Puffin manager factory for index.
137    puffin_manager_factory: PuffinManagerFactory,
138    /// Intermediate manager for inverted index.
139    intermediate_manager: IntermediateManager,
140}
141
142impl std::fmt::Debug for AccessLayer {
143    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144        f.debug_struct("AccessLayer")
145            .field("table_dir", &self.table_dir)
146            .finish()
147    }
148}
149
150impl AccessLayer {
151    /// Returns a new [AccessLayer] for specific `table_dir`.
152    pub fn new(
153        table_dir: impl Into<String>,
154        path_type: PathType,
155        object_store: ObjectStore,
156        puffin_manager_factory: PuffinManagerFactory,
157        intermediate_manager: IntermediateManager,
158    ) -> AccessLayer {
159        AccessLayer {
160            table_dir: table_dir.into(),
161            path_type,
162            object_store,
163            puffin_manager_factory,
164            intermediate_manager,
165        }
166    }
167
168    /// Returns the directory of the table.
169    pub fn table_dir(&self) -> &str {
170        &self.table_dir
171    }
172
173    /// Returns the object store of the layer.
174    pub fn object_store(&self) -> &ObjectStore {
175        &self.object_store
176    }
177
178    /// Returns the path type of the layer.
179    pub fn path_type(&self) -> PathType {
180        self.path_type
181    }
182
183    /// Returns the puffin manager factory.
184    pub fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
185        &self.puffin_manager_factory
186    }
187
188    /// Deletes a SST file (and its index file if it has one) with given file id.
189    pub(crate) async fn delete_sst(&self, file_meta: &FileMeta) -> Result<()> {
190        let path = location::sst_file_path(&self.table_dir, file_meta.file_id(), self.path_type);
191        self.object_store
192            .delete(&path)
193            .await
194            .context(DeleteSstSnafu {
195                file_id: file_meta.file_id,
196            })?;
197
198        let path = location::index_file_path(&self.table_dir, file_meta.file_id(), self.path_type);
199        self.object_store
200            .delete(&path)
201            .await
202            .context(DeleteIndexSnafu {
203                file_id: file_meta.file_id,
204            })?;
205
206        Ok(())
207    }
208
209    /// Returns the directory of the region in the table.
210    pub fn build_region_dir(&self, region_id: RegionId) -> String {
211        region_dir_from_table_dir(&self.table_dir, region_id, self.path_type)
212    }
213
214    /// Returns a reader builder for specific `file`.
215    pub(crate) fn read_sst(&self, file: FileHandle) -> ParquetReaderBuilder {
216        ParquetReaderBuilder::new(
217            self.table_dir.clone(),
218            self.path_type,
219            file,
220            self.object_store.clone(),
221        )
222    }
223
224    /// Writes a SST with specific `file_id` and `metadata` to the layer.
225    ///
226    /// Returns the info of the SST. If no data written, returns None.
227    pub async fn write_sst(
228        &self,
229        request: SstWriteRequest,
230        write_opts: &WriteOptions,
231        write_type: WriteType,
232    ) -> Result<(SstInfoArray, Metrics)> {
233        let region_id = request.metadata.region_id;
234        let cache_manager = request.cache_manager.clone();
235
236        let (sst_info, metrics) = if let Some(write_cache) = cache_manager.write_cache() {
237            // Write to the write cache.
238            write_cache
239                .write_and_upload_sst(
240                    request,
241                    SstUploadRequest {
242                        dest_path_provider: RegionFilePathFactory {
243                            table_dir: self.table_dir.clone(),
244                            path_type: self.path_type,
245                        },
246                        remote_store: self.object_store.clone(),
247                    },
248                    write_opts,
249                    write_type,
250                )
251                .await?
252        } else {
253            // Write cache is disabled.
254            let store = self.object_store.clone();
255            let path_provider = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
256            let indexer_builder = IndexerBuilderImpl {
257                op_type: request.op_type,
258                metadata: request.metadata.clone(),
259                row_group_size: write_opts.row_group_size,
260                puffin_manager: self
261                    .puffin_manager_factory
262                    .build(store, path_provider.clone()),
263                intermediate_manager: self.intermediate_manager.clone(),
264                index_options: request.index_options,
265                inverted_index_config: request.inverted_index_config,
266                fulltext_index_config: request.fulltext_index_config,
267                bloom_filter_index_config: request.bloom_filter_index_config,
268            };
269            // We disable write cache on file system but we still use atomic write.
270            // TODO(yingwen): If we support other non-fs stores without the write cache, then
271            // we may have find a way to check whether we need the cleaner.
272            let cleaner = TempFileCleaner::new(region_id, self.object_store.clone());
273            let mut writer = ParquetWriter::new_with_object_store(
274                self.object_store.clone(),
275                request.metadata,
276                indexer_builder,
277                path_provider,
278                Metrics::new(write_type),
279            )
280            .await
281            .with_file_cleaner(cleaner);
282            let ssts = writer
283                .write_all(request.source, request.max_sequence, write_opts)
284                .await?;
285            let metrics = writer.into_metrics();
286            (ssts, metrics)
287        };
288
289        // Put parquet metadata to cache manager.
290        if !sst_info.is_empty() {
291            for sst in &sst_info {
292                if let Some(parquet_metadata) = &sst.file_metadata {
293                    cache_manager.put_parquet_meta_data(
294                        RegionFileId::new(region_id, sst.file_id),
295                        parquet_metadata.clone(),
296                    )
297                }
298            }
299        }
300
301        Ok((sst_info, metrics))
302    }
303}
304
305/// `OperationType` represents the origin of the `SstWriteRequest`.
306#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
307pub enum OperationType {
308    Flush,
309    Compact,
310}
311
312/// Contents to build a SST.
313pub struct SstWriteRequest {
314    pub op_type: OperationType,
315    pub metadata: RegionMetadataRef,
316    pub source: Source,
317    pub cache_manager: CacheManagerRef,
318    #[allow(dead_code)]
319    pub storage: Option<String>,
320    pub max_sequence: Option<SequenceNumber>,
321
322    /// Configs for index
323    pub index_options: IndexOptions,
324    pub inverted_index_config: InvertedIndexConfig,
325    pub fulltext_index_config: FulltextIndexConfig,
326    pub bloom_filter_index_config: BloomFilterConfig,
327}
328
329/// Cleaner to remove temp files on the atomic write dir.
330pub(crate) struct TempFileCleaner {
331    region_id: RegionId,
332    object_store: ObjectStore,
333}
334
335impl TempFileCleaner {
336    /// Constructs the cleaner for the region and store.
337    pub(crate) fn new(region_id: RegionId, object_store: ObjectStore) -> Self {
338        Self {
339            region_id,
340            object_store,
341        }
342    }
343
344    /// Removes the SST and index file from the local atomic dir by the file id.
345    pub(crate) async fn clean_by_file_id(&self, file_id: FileId) {
346        let sst_key = IndexKey::new(self.region_id, file_id, FileType::Parquet).to_string();
347        let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin).to_string();
348
349        Self::clean_atomic_dir_files(&self.object_store, &[&sst_key, &index_key]).await;
350    }
351
352    /// Removes the files from the local atomic dir by their names.
353    pub(crate) async fn clean_atomic_dir_files(
354        local_store: &ObjectStore,
355        names_to_remove: &[&str],
356    ) {
357        // We don't know the actual suffix of the file under atomic dir, so we have
358        // to list the dir. The cost should be acceptable as there won't be to many files.
359        let Ok(entries) = local_store.list(ATOMIC_WRITE_DIR).await.inspect_err(|e| {
360            if e.kind() != ErrorKind::NotFound {
361                common_telemetry::error!(e; "Failed to list tmp files for {:?}", names_to_remove)
362            }
363        }) else {
364            return;
365        };
366
367        // In our case, we can ensure the file id is unique so it is safe to remove all files
368        // with the same file id under the atomic write dir.
369        let actual_files: Vec<_> = entries
370            .into_iter()
371            .filter_map(|entry| {
372                if entry.metadata().is_dir() {
373                    return None;
374                }
375
376                // Remove name that matches files_to_remove.
377                let should_remove = names_to_remove
378                    .iter()
379                    .any(|file| entry.name().starts_with(file));
380                if should_remove {
381                    Some(entry.path().to_string())
382                } else {
383                    None
384                }
385            })
386            .collect();
387
388        common_telemetry::warn!(
389            "Clean files {:?} under atomic write dir for {:?}",
390            actual_files,
391            names_to_remove
392        );
393
394        if let Err(e) = local_store.delete_iter(actual_files).await {
395            common_telemetry::error!(e; "Failed to delete tmp file for {:?}", names_to_remove);
396        }
397    }
398}
399
400pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
401    let atomic_write_dir = join_dir(root, ATOMIC_WRITE_DIR);
402    clean_dir(&atomic_write_dir).await?;
403
404    // Compatible code. Remove this after a major release.
405    let old_atomic_temp_dir = join_dir(root, OLD_ATOMIC_WRITE_DIR);
406    clean_dir(&old_atomic_temp_dir).await?;
407
408    let builder = Fs::default().root(root).atomic_write_dir(&atomic_write_dir);
409    let store = ObjectStore::new(builder).context(OpenDalSnafu)?.finish();
410
411    Ok(with_instrument_layers(store, false))
412}
413
414/// Clean the directory.
415async fn clean_dir(dir: &str) -> Result<()> {
416    if tokio::fs::try_exists(dir)
417        .await
418        .context(CleanDirSnafu { dir })?
419    {
420        tokio::fs::remove_dir_all(dir)
421            .await
422            .context(CleanDirSnafu { dir })?;
423    }
424
425    Ok(())
426}
427
428/// Path provider for SST file and index file.
429pub trait FilePathProvider: Send + Sync {
430    /// Creates index file path of given file id.
431    fn build_index_file_path(&self, file_id: RegionFileId) -> String;
432
433    /// Creates SST file path of given file id.
434    fn build_sst_file_path(&self, file_id: RegionFileId) -> String;
435}
436
437/// Path provider that builds paths in local write cache.
438#[derive(Clone)]
439pub(crate) struct WriteCachePathProvider {
440    file_cache: FileCacheRef,
441}
442
443impl WriteCachePathProvider {
444    /// Creates a new `WriteCachePathProvider` instance.
445    pub fn new(file_cache: FileCacheRef) -> Self {
446        Self { file_cache }
447    }
448}
449
450impl FilePathProvider for WriteCachePathProvider {
451    fn build_index_file_path(&self, file_id: RegionFileId) -> String {
452        let puffin_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
453        self.file_cache.cache_file_path(puffin_key)
454    }
455
456    fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
457        let parquet_file_key =
458            IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
459        self.file_cache.cache_file_path(parquet_file_key)
460    }
461}
462
463/// Path provider that builds paths in region storage path.
464#[derive(Clone, Debug)]
465pub(crate) struct RegionFilePathFactory {
466    pub(crate) table_dir: String,
467    pub(crate) path_type: PathType,
468}
469
470impl RegionFilePathFactory {
471    /// Creates a new `RegionFilePathFactory` instance.
472    pub fn new(table_dir: String, path_type: PathType) -> Self {
473        Self {
474            table_dir,
475            path_type,
476        }
477    }
478}
479
480impl FilePathProvider for RegionFilePathFactory {
481    fn build_index_file_path(&self, file_id: RegionFileId) -> String {
482        location::index_file_path(&self.table_dir, file_id, self.path_type)
483    }
484
485    fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
486        location::sst_file_path(&self.table_dir, file_id, self.path_type)
487    }
488}