datanode/
store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! object storage utilities
16
17use std::sync::Arc;
18
19use common_telemetry::info;
20use object_store::config::ObjectStorageCacheConfig;
21use object_store::factory::new_raw_object_store;
22use object_store::layers::LruCacheLayer;
23use object_store::services::Fs;
24use object_store::util::{clean_temp_dir, join_dir, with_instrument_layers, with_retry_layers};
25use object_store::{ATOMIC_WRITE_DIR, Access, ObjectStore, ObjectStoreBuilder};
26use snafu::prelude::*;
27
28use crate::config::ObjectStoreConfig;
29use crate::error::{self, Result};
30
31pub(crate) async fn new_object_store_without_cache(
32    store: &ObjectStoreConfig,
33    data_home: &str,
34) -> Result<ObjectStore> {
35    let object_store = new_raw_object_store(store, data_home)
36        .await
37        .context(error::ObjectStoreSnafu)?;
38    // Enable retry layer and cache layer for non-fs object storages
39    let object_store = if store.is_object_storage() {
40        // Adds retry layer
41        with_retry_layers(object_store)
42    } else {
43        object_store
44    };
45
46    let object_store = with_instrument_layers(object_store, true);
47    Ok(object_store)
48}
49
50pub(crate) async fn new_object_store(
51    store: ObjectStoreConfig,
52    data_home: &str,
53) -> Result<ObjectStore> {
54    let object_store = new_raw_object_store(&store, data_home)
55        .await
56        .context(error::ObjectStoreSnafu)?;
57    // Enable retry layer and cache layer for non-fs object storages
58    let object_store = if store.is_object_storage() {
59        let object_store = {
60            // It's safe to unwrap here because we already checked above.
61            let cache_config = store.cache_config().unwrap();
62            if let Some(cache_layer) = build_cache_layer(cache_config).await? {
63                // Adds cache layer
64                object_store.layer(cache_layer)
65            } else {
66                object_store
67            }
68        };
69
70        // Adds retry layer
71        with_retry_layers(object_store)
72    } else {
73        object_store
74    };
75
76    let object_store = with_instrument_layers(object_store, true);
77    Ok(object_store)
78}
79
80async fn build_cache_layer(
81    cache_config: &ObjectStorageCacheConfig,
82) -> Result<Option<LruCacheLayer<impl Access>>> {
83    // No need to build cache layer if read cache is disabled.
84    if !cache_config.enable_read_cache {
85        return Ok(None);
86    }
87
88    let atomic_temp_dir = join_dir(&cache_config.cache_path, ATOMIC_WRITE_DIR);
89    clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?;
90
91    let cache_store = Fs::default()
92        .root(&cache_config.cache_path)
93        .atomic_write_dir(&atomic_temp_dir)
94        .build()
95        .context(error::BuildCacheStoreSnafu)?;
96
97    let cache_layer = LruCacheLayer::new(
98        Arc::new(cache_store),
99        cache_config.cache_capacity.0 as usize,
100    )
101    .context(error::BuildCacheStoreSnafu)?;
102    cache_layer.recover_cache(false).await;
103
104    info!(
105        "Enabled local object storage cache, path: {}, capacity: {}.",
106        cache_config.cache_path, cache_config.cache_capacity
107    );
108
109    Ok(Some(cache_layer))
110}