datanode/
store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! object storage utilities
16
17use std::path::Path;
18use std::sync::Arc;
19use std::time::Duration;
20
21use common_telemetry::{info, warn};
22use object_store::factory::new_raw_object_store;
23use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
24use object_store::services::Fs;
25use object_store::util::{clean_temp_dir, join_dir, with_instrument_layers};
26use object_store::{
27    Access, Error, ObjectStore, ObjectStoreBuilder, ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR,
28};
29use snafu::prelude::*;
30
31use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
32use crate::error::{self, CreateDirSnafu, Result};
33
34fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
35    object_store.layer(
36        RetryLayer::new()
37            .with_jitter()
38            .with_notify(PrintDetailedError),
39    )
40}
41
42pub(crate) async fn new_object_store_without_cache(
43    store: &ObjectStoreConfig,
44    data_home: &str,
45) -> Result<ObjectStore> {
46    let object_store = new_raw_object_store(store, data_home)
47        .await
48        .context(error::ObjectStoreSnafu)?;
49    // Enable retry layer and cache layer for non-fs object storages
50    let object_store = if store.is_object_storage() {
51        // Adds retry layer
52        with_retry_layers(object_store)
53    } else {
54        object_store
55    };
56
57    let object_store = with_instrument_layers(object_store, true);
58    Ok(object_store)
59}
60
61pub(crate) async fn new_object_store(
62    store: ObjectStoreConfig,
63    data_home: &str,
64) -> Result<ObjectStore> {
65    let object_store = new_raw_object_store(&store, data_home)
66        .await
67        .context(error::ObjectStoreSnafu)?;
68    // Enable retry layer and cache layer for non-fs object storages
69    let object_store = if store.is_object_storage() {
70        let object_store = if let Some(cache_layer) = build_cache_layer(&store, data_home).await? {
71            // Adds cache layer
72            object_store.layer(cache_layer)
73        } else {
74            object_store
75        };
76
77        // Adds retry layer
78        with_retry_layers(object_store)
79    } else {
80        object_store
81    };
82
83    let object_store = with_instrument_layers(object_store, true);
84    Ok(object_store)
85}
86
87async fn build_cache_layer(
88    store_config: &ObjectStoreConfig,
89    data_home: &str,
90) -> Result<Option<LruCacheLayer<impl Access>>> {
91    let (name, mut cache_path, cache_capacity) = match store_config {
92        ObjectStoreConfig::S3(s3_config) => {
93            let path = s3_config.cache.cache_path.clone();
94            let name = &s3_config.name;
95            let capacity = s3_config
96                .cache
97                .cache_capacity
98                .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
99            (name, path, capacity)
100        }
101        ObjectStoreConfig::Oss(oss_config) => {
102            let path = oss_config.cache.cache_path.clone();
103            let name = &oss_config.name;
104            let capacity = oss_config
105                .cache
106                .cache_capacity
107                .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
108            (name, path, capacity)
109        }
110        ObjectStoreConfig::Azblob(azblob_config) => {
111            let path = azblob_config.cache.cache_path.clone();
112            let name = &azblob_config.name;
113            let capacity = azblob_config
114                .cache
115                .cache_capacity
116                .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
117            (name, path, capacity)
118        }
119        ObjectStoreConfig::Gcs(gcs_config) => {
120            let path = gcs_config.cache.cache_path.clone();
121            let name = &gcs_config.name;
122            let capacity = gcs_config
123                .cache
124                .cache_capacity
125                .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
126            (name, path, capacity)
127        }
128        _ => unreachable!("Already checked above"),
129    };
130
131    // Enable object cache by default
132    // Set the cache_path to be `${data_home}` by default
133    // if it's not present
134    if cache_path.is_none() {
135        let read_cache_path = data_home.to_string();
136        tokio::fs::create_dir_all(Path::new(&read_cache_path))
137            .await
138            .context(CreateDirSnafu {
139                dir: &read_cache_path,
140            })?;
141
142        info!(
143            "The object storage cache path is not set for '{}', using the default path: '{}'",
144            name, &read_cache_path
145        );
146
147        cache_path = Some(read_cache_path);
148    }
149
150    if let Some(path) = cache_path.as_ref()
151        && !path.trim().is_empty()
152    {
153        let atomic_temp_dir = join_dir(path, ATOMIC_WRITE_DIR);
154        clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?;
155
156        // Compatible code. Remove this after a major release.
157        let old_atomic_temp_dir = join_dir(path, OLD_ATOMIC_WRITE_DIR);
158        clean_temp_dir(&old_atomic_temp_dir).context(error::ObjectStoreSnafu)?;
159
160        let cache_store = Fs::default()
161            .root(path)
162            .atomic_write_dir(&atomic_temp_dir)
163            .build()
164            .context(error::BuildCacheStoreSnafu)?;
165
166        let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
167            .context(error::BuildCacheStoreSnafu)?;
168        cache_layer.recover_cache(false).await;
169        info!(
170            "Enabled local object storage cache, path: {}, capacity: {}.",
171            path, cache_capacity
172        );
173
174        Ok(Some(cache_layer))
175    } else {
176        Ok(None)
177    }
178}
179
180struct PrintDetailedError;
181
182// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying.
183impl RetryInterceptor for PrintDetailedError {
184    fn intercept(&self, err: &Error, dur: Duration) {
185        warn!("Retry after {}s, error: {:#?}", dur.as_secs_f64(), err);
186    }
187}