datanode/
store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! object storage utilities
16
17use std::sync::Arc;
18
19use common_telemetry::info;
20use object_store::config::ObjectStorageCacheConfig;
21use object_store::factory::new_raw_object_store;
22use object_store::layers::LruCacheLayer;
23use object_store::services::Fs;
24use object_store::util::{clean_temp_dir, join_dir, with_instrument_layers, with_retry_layers};
25use object_store::{ATOMIC_WRITE_DIR, Access, ObjectStore, ObjectStoreBuilder};
26use snafu::prelude::*;
27
28use crate::config::ObjectStoreConfig;
29use crate::error::{self, Result};
30
31pub(crate) async fn new_object_store_without_cache(
32    store: &ObjectStoreConfig,
33    data_home: &str,
34) -> Result<ObjectStore> {
35    let object_store = new_raw_object_store(store, data_home)
36        .await
37        .context(error::ObjectStoreSnafu)?;
38    // Enable retry layer and cache layer for non-fs object storages
39    let object_store = if store.is_object_storage() {
40        // Adds retry layer
41        with_retry_layers(object_store)
42    } else {
43        object_store
44    };
45
46    let object_store = with_instrument_layers(object_store, true);
47    Ok(object_store)
48}
49
50pub async fn new_object_store(store: ObjectStoreConfig, data_home: &str) -> Result<ObjectStore> {
51    let object_store = new_raw_object_store(&store, data_home)
52        .await
53        .context(error::ObjectStoreSnafu)?;
54    // Enable retry layer and cache layer for non-fs object storages
55    let object_store = if store.is_object_storage() {
56        let object_store = {
57            // It's safe to unwrap here because we already checked above.
58            let cache_config = store.cache_config().unwrap();
59            if let Some(cache_layer) = build_cache_layer(cache_config, data_home).await? {
60                // Adds cache layer
61                object_store.layer(cache_layer)
62            } else {
63                object_store
64            }
65        };
66
67        // Adds retry layer
68        with_retry_layers(object_store)
69    } else {
70        object_store
71    };
72
73    let object_store = with_instrument_layers(object_store, true);
74    Ok(object_store)
75}
76
77async fn build_cache_layer(
78    cache_config: &ObjectStorageCacheConfig,
79    data_home: &str,
80) -> Result<Option<LruCacheLayer<impl Access>>> {
81    // No need to build cache layer if read cache is disabled.
82    if !cache_config.enable_read_cache {
83        return Ok(None);
84    }
85    let cache_base_dir = if cache_config.cache_path.is_empty() {
86        data_home
87    } else {
88        &cache_config.cache_path
89    };
90    let atomic_temp_dir = join_dir(cache_base_dir, ATOMIC_WRITE_DIR);
91    clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?;
92
93    let cache_store = Fs::default()
94        .root(cache_base_dir)
95        .atomic_write_dir(&atomic_temp_dir)
96        .build()
97        .context(error::BuildCacheStoreSnafu)?;
98
99    let cache_layer = LruCacheLayer::new(
100        Arc::new(cache_store),
101        cache_config.cache_capacity.0 as usize,
102    )
103    .context(error::BuildCacheStoreSnafu)?;
104    cache_layer.recover_cache(false).await;
105
106    info!(
107        "Enabled local object storage cache, path: {}, capacity: {}.",
108        cache_config.cache_path, cache_config.cache_capacity
109    );
110
111    Ok(Some(cache_layer))
112}