datanode/
store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! object storage utilities
16
17mod azblob;
18pub mod fs;
19mod gcs;
20mod oss;
21mod s3;
22use std::path;
23use std::path::Path;
24use std::sync::Arc;
25use std::time::Duration;
26
27use common_telemetry::{info, warn};
28use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
29use object_store::services::Fs;
30use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
31use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder};
32use snafu::prelude::*;
33
34use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
35use crate::error::{self, BuildHttpClientSnafu, CreateDirSnafu, Result};
36
37pub(crate) async fn new_raw_object_store(
38    store: &ObjectStoreConfig,
39    data_home: &str,
40) -> Result<ObjectStore> {
41    let data_home = normalize_dir(data_home);
42    let object_store = match store {
43        ObjectStoreConfig::File(file_config) => {
44            fs::new_fs_object_store(&data_home, file_config).await
45        }
46        ObjectStoreConfig::S3(s3_config) => s3::new_s3_object_store(s3_config).await,
47        ObjectStoreConfig::Oss(oss_config) => oss::new_oss_object_store(oss_config).await,
48        ObjectStoreConfig::Azblob(azblob_config) => {
49            azblob::new_azblob_object_store(azblob_config).await
50        }
51        ObjectStoreConfig::Gcs(gcs_config) => gcs::new_gcs_object_store(gcs_config).await,
52    }?;
53    Ok(object_store)
54}
55
56fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
57    object_store.layer(
58        RetryLayer::new()
59            .with_jitter()
60            .with_notify(PrintDetailedError),
61    )
62}
63
64pub(crate) async fn new_object_store_without_cache(
65    store: &ObjectStoreConfig,
66    data_home: &str,
67) -> Result<ObjectStore> {
68    let object_store = new_raw_object_store(store, data_home).await?;
69    // Enable retry layer and cache layer for non-fs object storages
70    let object_store = if store.is_object_storage() {
71        // Adds retry layer
72        with_retry_layers(object_store)
73    } else {
74        object_store
75    };
76
77    let object_store = with_instrument_layers(object_store, true);
78    Ok(object_store)
79}
80
81pub(crate) async fn new_object_store(
82    store: ObjectStoreConfig,
83    data_home: &str,
84) -> Result<ObjectStore> {
85    let object_store = new_raw_object_store(&store, data_home).await?;
86    // Enable retry layer and cache layer for non-fs object storages
87    let object_store = if store.is_object_storage() {
88        let object_store = if let Some(cache_layer) = build_cache_layer(&store, data_home).await? {
89            // Adds cache layer
90            object_store.layer(cache_layer)
91        } else {
92            object_store
93        };
94
95        // Adds retry layer
96        with_retry_layers(object_store)
97    } else {
98        object_store
99    };
100
101    let object_store = with_instrument_layers(object_store, true);
102    Ok(object_store)
103}
104
105async fn build_cache_layer(
106    store_config: &ObjectStoreConfig,
107    data_home: &str,
108) -> Result<Option<LruCacheLayer<impl Access>>> {
109    let (name, mut cache_path, cache_capacity) = match store_config {
110        ObjectStoreConfig::S3(s3_config) => {
111            let path = s3_config.cache.cache_path.clone();
112            let name = &s3_config.name;
113            let capacity = s3_config
114                .cache
115                .cache_capacity
116                .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
117            (name, path, capacity)
118        }
119        ObjectStoreConfig::Oss(oss_config) => {
120            let path = oss_config.cache.cache_path.clone();
121            let name = &oss_config.name;
122            let capacity = oss_config
123                .cache
124                .cache_capacity
125                .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
126            (name, path, capacity)
127        }
128        ObjectStoreConfig::Azblob(azblob_config) => {
129            let path = azblob_config.cache.cache_path.clone();
130            let name = &azblob_config.name;
131            let capacity = azblob_config
132                .cache
133                .cache_capacity
134                .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
135            (name, path, capacity)
136        }
137        ObjectStoreConfig::Gcs(gcs_config) => {
138            let path = gcs_config.cache.cache_path.clone();
139            let name = &gcs_config.name;
140            let capacity = gcs_config
141                .cache
142                .cache_capacity
143                .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
144            (name, path, capacity)
145        }
146        _ => unreachable!("Already checked above"),
147    };
148
149    // Enable object cache by default
150    // Set the cache_path to be `${data_home}` by default
151    // if it's not present
152    if cache_path.is_none() {
153        let read_cache_path = data_home.to_string();
154        tokio::fs::create_dir_all(Path::new(&read_cache_path))
155            .await
156            .context(CreateDirSnafu {
157                dir: &read_cache_path,
158            })?;
159
160        info!(
161            "The object storage cache path is not set for '{}', using the default path: '{}'",
162            name, &read_cache_path
163        );
164
165        cache_path = Some(read_cache_path);
166    }
167
168    if let Some(path) = cache_path.as_ref()
169        && !path.trim().is_empty()
170    {
171        let atomic_temp_dir = join_dir(path, ".tmp/");
172        clean_temp_dir(&atomic_temp_dir)?;
173
174        let cache_store = Fs::default()
175            .root(path)
176            .atomic_write_dir(&atomic_temp_dir)
177            .build()
178            .context(error::InitBackendSnafu)?;
179
180        let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
181            .context(error::InitBackendSnafu)?;
182        cache_layer.recover_cache(false).await;
183        info!(
184            "Enabled local object storage cache, path: {}, capacity: {}.",
185            path, cache_capacity
186        );
187
188        Ok(Some(cache_layer))
189    } else {
190        Ok(None)
191    }
192}
193
194pub(crate) fn clean_temp_dir(dir: &str) -> Result<()> {
195    if path::Path::new(&dir).exists() {
196        info!("Begin to clean temp storage directory: {}", dir);
197        std::fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?;
198        info!("Cleaned temp storage directory: {}", dir);
199    }
200
201    Ok(())
202}
203
204pub(crate) fn build_http_client(config: &HttpClientConfig) -> Result<HttpClient> {
205    let client = reqwest::ClientBuilder::new()
206        .pool_max_idle_per_host(config.pool_max_idle_per_host as usize)
207        .connect_timeout(config.connect_timeout)
208        .pool_idle_timeout(config.pool_idle_timeout)
209        .timeout(config.timeout)
210        .build()
211        .context(BuildHttpClientSnafu)?;
212    Ok(HttpClient::with(client))
213}
214struct PrintDetailedError;
215
216// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying.
217impl RetryInterceptor for PrintDetailedError {
218    fn intercept(&self, err: &Error, dur: Duration) {
219        warn!("Retry after {}s, error: {:#?}", dur.as_secs_f64(), err);
220    }
221}