mito2/
access_layer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use async_stream::try_stream;
19use common_time::Timestamp;
20use futures::{Stream, TryStreamExt};
21use object_store::services::Fs;
22use object_store::util::{join_dir, with_instrument_layers};
23use object_store::{ErrorKind, ObjectStore, ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR};
24use smallvec::SmallVec;
25use snafu::ResultExt;
26use store_api::metadata::RegionMetadataRef;
27use store_api::region_request::PathType;
28use store_api::sst_entry::StorageSstEntry;
29use store_api::storage::{RegionId, SequenceNumber};
30
31use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
32use crate::cache::write_cache::SstUploadRequest;
33use crate::cache::CacheManagerRef;
34use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
35use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
36use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED};
37use crate::read::Source;
38use crate::region::options::IndexOptions;
39use crate::sst::file::{FileHandle, FileId, RegionFileId};
40use crate::sst::index::intermediate::IntermediateManager;
41use crate::sst::index::puffin_manager::PuffinManagerFactory;
42use crate::sst::index::IndexerBuilderImpl;
43use crate::sst::location::{self, region_dir_from_table_dir};
44use crate::sst::parquet::reader::ParquetReaderBuilder;
45use crate::sst::parquet::writer::ParquetWriter;
46use crate::sst::parquet::{SstInfo, WriteOptions};
47
48pub type AccessLayerRef = Arc<AccessLayer>;
49/// SST write results.
50pub type SstInfoArray = SmallVec<[SstInfo; 2]>;
51
52/// Write operation type.
53#[derive(Eq, PartialEq, Debug)]
54pub enum WriteType {
55    /// Writes from flush
56    Flush,
57    /// Writes from compaction.
58    Compaction,
59}
60
61#[derive(Debug)]
62pub struct Metrics {
63    pub(crate) write_type: WriteType,
64    pub(crate) iter_source: Duration,
65    pub(crate) write_batch: Duration,
66    pub(crate) update_index: Duration,
67    pub(crate) upload_parquet: Duration,
68    pub(crate) upload_puffin: Duration,
69}
70
71impl Metrics {
72    pub(crate) fn new(write_type: WriteType) -> Self {
73        Self {
74            write_type,
75            iter_source: Default::default(),
76            write_batch: Default::default(),
77            update_index: Default::default(),
78            upload_parquet: Default::default(),
79            upload_puffin: Default::default(),
80        }
81    }
82
83    pub(crate) fn merge(mut self, other: Self) -> Self {
84        assert_eq!(self.write_type, other.write_type);
85        self.iter_source += other.iter_source;
86        self.write_batch += other.write_batch;
87        self.update_index += other.update_index;
88        self.upload_parquet += other.upload_parquet;
89        self.upload_puffin += other.upload_puffin;
90        self
91    }
92
93    pub(crate) fn observe(self) {
94        match self.write_type {
95            WriteType::Flush => {
96                FLUSH_ELAPSED
97                    .with_label_values(&["iter_source"])
98                    .observe(self.iter_source.as_secs_f64());
99                FLUSH_ELAPSED
100                    .with_label_values(&["write_batch"])
101                    .observe(self.write_batch.as_secs_f64());
102                FLUSH_ELAPSED
103                    .with_label_values(&["update_index"])
104                    .observe(self.update_index.as_secs_f64());
105                FLUSH_ELAPSED
106                    .with_label_values(&["upload_parquet"])
107                    .observe(self.upload_parquet.as_secs_f64());
108                FLUSH_ELAPSED
109                    .with_label_values(&["upload_puffin"])
110                    .observe(self.upload_puffin.as_secs_f64());
111            }
112            WriteType::Compaction => {
113                COMPACTION_STAGE_ELAPSED
114                    .with_label_values(&["iter_source"])
115                    .observe(self.iter_source.as_secs_f64());
116                COMPACTION_STAGE_ELAPSED
117                    .with_label_values(&["write_batch"])
118                    .observe(self.write_batch.as_secs_f64());
119                COMPACTION_STAGE_ELAPSED
120                    .with_label_values(&["update_index"])
121                    .observe(self.update_index.as_secs_f64());
122                COMPACTION_STAGE_ELAPSED
123                    .with_label_values(&["upload_parquet"])
124                    .observe(self.upload_parquet.as_secs_f64());
125                COMPACTION_STAGE_ELAPSED
126                    .with_label_values(&["upload_puffin"])
127                    .observe(self.upload_puffin.as_secs_f64());
128            }
129        };
130    }
131}
132
133/// A layer to access SST files under the same directory.
134pub struct AccessLayer {
135    table_dir: String,
136    /// Path type for generating file paths.
137    path_type: PathType,
138    /// Target object store.
139    object_store: ObjectStore,
140    /// Puffin manager factory for index.
141    puffin_manager_factory: PuffinManagerFactory,
142    /// Intermediate manager for inverted index.
143    intermediate_manager: IntermediateManager,
144}
145
146impl std::fmt::Debug for AccessLayer {
147    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148        f.debug_struct("AccessLayer")
149            .field("table_dir", &self.table_dir)
150            .finish()
151    }
152}
153
154impl AccessLayer {
155    /// Returns a new [AccessLayer] for specific `table_dir`.
156    pub fn new(
157        table_dir: impl Into<String>,
158        path_type: PathType,
159        object_store: ObjectStore,
160        puffin_manager_factory: PuffinManagerFactory,
161        intermediate_manager: IntermediateManager,
162    ) -> AccessLayer {
163        AccessLayer {
164            table_dir: table_dir.into(),
165            path_type,
166            object_store,
167            puffin_manager_factory,
168            intermediate_manager,
169        }
170    }
171
172    /// Returns the directory of the table.
173    pub fn table_dir(&self) -> &str {
174        &self.table_dir
175    }
176
177    /// Returns the object store of the layer.
178    pub fn object_store(&self) -> &ObjectStore {
179        &self.object_store
180    }
181
182    /// Returns the path type of the layer.
183    pub fn path_type(&self) -> PathType {
184        self.path_type
185    }
186
187    /// Returns the puffin manager factory.
188    pub fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
189        &self.puffin_manager_factory
190    }
191
192    /// Returns the intermediate manager.
193    pub fn intermediate_manager(&self) -> &IntermediateManager {
194        &self.intermediate_manager
195    }
196
197    /// Deletes a SST file (and its index file if it has one) with given file id.
198    pub(crate) async fn delete_sst(&self, region_file_id: &RegionFileId) -> Result<()> {
199        let path = location::sst_file_path(&self.table_dir, *region_file_id, self.path_type);
200        self.object_store
201            .delete(&path)
202            .await
203            .context(DeleteSstSnafu {
204                file_id: region_file_id.file_id(),
205            })?;
206
207        let path = location::index_file_path(&self.table_dir, *region_file_id, self.path_type);
208        self.object_store
209            .delete(&path)
210            .await
211            .context(DeleteIndexSnafu {
212                file_id: region_file_id.file_id(),
213            })?;
214
215        Ok(())
216    }
217
218    /// Returns the directory of the region in the table.
219    pub fn build_region_dir(&self, region_id: RegionId) -> String {
220        region_dir_from_table_dir(&self.table_dir, region_id, self.path_type)
221    }
222
223    /// Returns a reader builder for specific `file`.
224    pub(crate) fn read_sst(&self, file: FileHandle) -> ParquetReaderBuilder {
225        ParquetReaderBuilder::new(
226            self.table_dir.clone(),
227            self.path_type,
228            file,
229            self.object_store.clone(),
230        )
231    }
232
233    /// Writes a SST with specific `file_id` and `metadata` to the layer.
234    ///
235    /// Returns the info of the SST. If no data written, returns None.
236    pub async fn write_sst(
237        &self,
238        request: SstWriteRequest,
239        write_opts: &WriteOptions,
240        write_type: WriteType,
241    ) -> Result<(SstInfoArray, Metrics)> {
242        let region_id = request.metadata.region_id;
243        let cache_manager = request.cache_manager.clone();
244
245        let (sst_info, metrics) = if let Some(write_cache) = cache_manager.write_cache() {
246            // Write to the write cache.
247            write_cache
248                .write_and_upload_sst(
249                    request,
250                    SstUploadRequest {
251                        dest_path_provider: RegionFilePathFactory::new(
252                            self.table_dir.clone(),
253                            self.path_type,
254                        ),
255                        remote_store: self.object_store.clone(),
256                    },
257                    write_opts,
258                    write_type,
259                )
260                .await?
261        } else {
262            // Write cache is disabled.
263            let store = self.object_store.clone();
264            let path_provider = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
265            let indexer_builder = IndexerBuilderImpl {
266                op_type: request.op_type,
267                metadata: request.metadata.clone(),
268                row_group_size: write_opts.row_group_size,
269                puffin_manager: self
270                    .puffin_manager_factory
271                    .build(store, path_provider.clone()),
272                intermediate_manager: self.intermediate_manager.clone(),
273                index_options: request.index_options,
274                inverted_index_config: request.inverted_index_config,
275                fulltext_index_config: request.fulltext_index_config,
276                bloom_filter_index_config: request.bloom_filter_index_config,
277            };
278            // We disable write cache on file system but we still use atomic write.
279            // TODO(yingwen): If we support other non-fs stores without the write cache, then
280            // we may have find a way to check whether we need the cleaner.
281            let cleaner = TempFileCleaner::new(region_id, self.object_store.clone());
282            let mut writer = ParquetWriter::new_with_object_store(
283                self.object_store.clone(),
284                request.metadata,
285                indexer_builder,
286                path_provider,
287                Metrics::new(write_type),
288            )
289            .await
290            .with_file_cleaner(cleaner);
291            let ssts = writer
292                .write_all(request.source, request.max_sequence, write_opts)
293                .await?;
294            let metrics = writer.into_metrics();
295            (ssts, metrics)
296        };
297
298        // Put parquet metadata to cache manager.
299        if !sst_info.is_empty() {
300            for sst in &sst_info {
301                if let Some(parquet_metadata) = &sst.file_metadata {
302                    cache_manager.put_parquet_meta_data(
303                        RegionFileId::new(region_id, sst.file_id),
304                        parquet_metadata.clone(),
305                    )
306                }
307            }
308        }
309
310        Ok((sst_info, metrics))
311    }
312
313    /// Lists the SST entries from the storage layer in the table directory.
314    pub fn storage_sst_entries(&self) -> impl Stream<Item = Result<StorageSstEntry>> {
315        let object_store = self.object_store.clone();
316        let table_dir = self.table_dir.clone();
317
318        try_stream! {
319            let mut lister = object_store
320                .lister_with(table_dir.as_str())
321                .recursive(true)
322                .await
323                .context(OpenDalSnafu)?;
324
325            while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
326                let metadata = entry.metadata();
327                if metadata.is_dir() {
328                    continue;
329                }
330
331                let path = entry.path();
332                if !path.ends_with(".parquet") && !path.ends_with(".puffin") {
333                    continue;
334                }
335
336                let file_size = metadata.content_length();
337                let file_size = if file_size == 0 { None } else { Some(file_size) };
338                let last_modified_ms = metadata
339                    .last_modified()
340                    .map(|ts| Timestamp::new_millisecond(ts.timestamp_millis()));
341
342                let entry = StorageSstEntry {
343                    file_path: path.to_string(),
344                    file_size,
345                    last_modified_ms,
346                };
347
348                yield entry;
349            }
350        }
351    }
352}
353
354/// `OperationType` represents the origin of the `SstWriteRequest`.
355#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
356pub enum OperationType {
357    Flush,
358    Compact,
359}
360
361/// Contents to build a SST.
362pub struct SstWriteRequest {
363    pub op_type: OperationType,
364    pub metadata: RegionMetadataRef,
365    pub source: Source,
366    pub cache_manager: CacheManagerRef,
367    #[allow(dead_code)]
368    pub storage: Option<String>,
369    pub max_sequence: Option<SequenceNumber>,
370
371    /// Configs for index
372    pub index_options: IndexOptions,
373    pub inverted_index_config: InvertedIndexConfig,
374    pub fulltext_index_config: FulltextIndexConfig,
375    pub bloom_filter_index_config: BloomFilterConfig,
376}
377
378/// Cleaner to remove temp files on the atomic write dir.
379pub(crate) struct TempFileCleaner {
380    region_id: RegionId,
381    object_store: ObjectStore,
382}
383
384impl TempFileCleaner {
385    /// Constructs the cleaner for the region and store.
386    pub(crate) fn new(region_id: RegionId, object_store: ObjectStore) -> Self {
387        Self {
388            region_id,
389            object_store,
390        }
391    }
392
393    /// Removes the SST and index file from the local atomic dir by the file id.
394    pub(crate) async fn clean_by_file_id(&self, file_id: FileId) {
395        let sst_key = IndexKey::new(self.region_id, file_id, FileType::Parquet).to_string();
396        let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin).to_string();
397
398        Self::clean_atomic_dir_files(&self.object_store, &[&sst_key, &index_key]).await;
399    }
400
401    /// Removes the files from the local atomic dir by their names.
402    pub(crate) async fn clean_atomic_dir_files(
403        local_store: &ObjectStore,
404        names_to_remove: &[&str],
405    ) {
406        // We don't know the actual suffix of the file under atomic dir, so we have
407        // to list the dir. The cost should be acceptable as there won't be to many files.
408        let Ok(entries) = local_store.list(ATOMIC_WRITE_DIR).await.inspect_err(|e| {
409            if e.kind() != ErrorKind::NotFound {
410                common_telemetry::error!(e; "Failed to list tmp files for {:?}", names_to_remove)
411            }
412        }) else {
413            return;
414        };
415
416        // In our case, we can ensure the file id is unique so it is safe to remove all files
417        // with the same file id under the atomic write dir.
418        let actual_files: Vec<_> = entries
419            .into_iter()
420            .filter_map(|entry| {
421                if entry.metadata().is_dir() {
422                    return None;
423                }
424
425                // Remove name that matches files_to_remove.
426                let should_remove = names_to_remove
427                    .iter()
428                    .any(|file| entry.name().starts_with(file));
429                if should_remove {
430                    Some(entry.path().to_string())
431                } else {
432                    None
433                }
434            })
435            .collect();
436
437        common_telemetry::warn!(
438            "Clean files {:?} under atomic write dir for {:?}",
439            actual_files,
440            names_to_remove
441        );
442
443        if let Err(e) = local_store.delete_iter(actual_files).await {
444            common_telemetry::error!(e; "Failed to delete tmp file for {:?}", names_to_remove);
445        }
446    }
447}
448
449pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
450    let atomic_write_dir = join_dir(root, ATOMIC_WRITE_DIR);
451    clean_dir(&atomic_write_dir).await?;
452
453    // Compatible code. Remove this after a major release.
454    let old_atomic_temp_dir = join_dir(root, OLD_ATOMIC_WRITE_DIR);
455    clean_dir(&old_atomic_temp_dir).await?;
456
457    let builder = Fs::default().root(root).atomic_write_dir(&atomic_write_dir);
458    let store = ObjectStore::new(builder).context(OpenDalSnafu)?.finish();
459
460    Ok(with_instrument_layers(store, false))
461}
462
463/// Clean the directory.
464async fn clean_dir(dir: &str) -> Result<()> {
465    if tokio::fs::try_exists(dir)
466        .await
467        .context(CleanDirSnafu { dir })?
468    {
469        tokio::fs::remove_dir_all(dir)
470            .await
471            .context(CleanDirSnafu { dir })?;
472    }
473
474    Ok(())
475}
476
477/// Path provider for SST file and index file.
478pub trait FilePathProvider: Send + Sync {
479    /// Creates index file path of given file id.
480    fn build_index_file_path(&self, file_id: RegionFileId) -> String;
481
482    /// Creates SST file path of given file id.
483    fn build_sst_file_path(&self, file_id: RegionFileId) -> String;
484}
485
486/// Path provider that builds paths in local write cache.
487#[derive(Clone)]
488pub(crate) struct WriteCachePathProvider {
489    file_cache: FileCacheRef,
490}
491
492impl WriteCachePathProvider {
493    /// Creates a new `WriteCachePathProvider` instance.
494    pub fn new(file_cache: FileCacheRef) -> Self {
495        Self { file_cache }
496    }
497}
498
499impl FilePathProvider for WriteCachePathProvider {
500    fn build_index_file_path(&self, file_id: RegionFileId) -> String {
501        let puffin_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
502        self.file_cache.cache_file_path(puffin_key)
503    }
504
505    fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
506        let parquet_file_key =
507            IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
508        self.file_cache.cache_file_path(parquet_file_key)
509    }
510}
511
512/// Path provider that builds paths in region storage path.
513#[derive(Clone, Debug)]
514pub(crate) struct RegionFilePathFactory {
515    pub(crate) table_dir: String,
516    pub(crate) path_type: PathType,
517}
518
519impl RegionFilePathFactory {
520    /// Creates a new `RegionFilePathFactory` instance.
521    pub fn new(table_dir: String, path_type: PathType) -> Self {
522        Self {
523            table_dir,
524            path_type,
525        }
526    }
527}
528
529impl FilePathProvider for RegionFilePathFactory {
530    fn build_index_file_path(&self, file_id: RegionFileId) -> String {
531        location::index_file_path(&self.table_dir, file_id, self.path_type)
532    }
533
534    fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
535        location::sst_file_path(&self.table_dir, file_id, self.path_type)
536    }
537}