datanode/
store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! object storage utilities
16
17mod azblob;
18pub mod fs;
19mod gcs;
20mod oss;
21mod s3;
22use std::path;
23use std::path::Path;
24use std::sync::Arc;
25use std::time::Duration;
26
27use common_telemetry::{info, warn};
28use mito2::access_layer::{ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR};
29use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
30use object_store::services::Fs;
31use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
32use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder};
33use snafu::prelude::*;
34
35use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
36use crate::error::{self, BuildHttpClientSnafu, CreateDirSnafu, Result};
37
38pub(crate) async fn new_raw_object_store(
39    store: &ObjectStoreConfig,
40    data_home: &str,
41) -> Result<ObjectStore> {
42    let data_home = normalize_dir(data_home);
43    let object_store = match store {
44        ObjectStoreConfig::File(file_config) => {
45            fs::new_fs_object_store(&data_home, file_config).await
46        }
47        ObjectStoreConfig::S3(s3_config) => s3::new_s3_object_store(s3_config).await,
48        ObjectStoreConfig::Oss(oss_config) => oss::new_oss_object_store(oss_config).await,
49        ObjectStoreConfig::Azblob(azblob_config) => {
50            azblob::new_azblob_object_store(azblob_config).await
51        }
52        ObjectStoreConfig::Gcs(gcs_config) => gcs::new_gcs_object_store(gcs_config).await,
53    }?;
54    Ok(object_store)
55}
56
57fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
58    object_store.layer(
59        RetryLayer::new()
60            .with_jitter()
61            .with_notify(PrintDetailedError),
62    )
63}
64
65pub(crate) async fn new_object_store_without_cache(
66    store: &ObjectStoreConfig,
67    data_home: &str,
68) -> Result<ObjectStore> {
69    let object_store = new_raw_object_store(store, data_home).await?;
70    // Enable retry layer and cache layer for non-fs object storages
71    let object_store = if store.is_object_storage() {
72        // Adds retry layer
73        with_retry_layers(object_store)
74    } else {
75        object_store
76    };
77
78    let object_store = with_instrument_layers(object_store, true);
79    Ok(object_store)
80}
81
82pub(crate) async fn new_object_store(
83    store: ObjectStoreConfig,
84    data_home: &str,
85) -> Result<ObjectStore> {
86    let object_store = new_raw_object_store(&store, data_home).await?;
87    // Enable retry layer and cache layer for non-fs object storages
88    let object_store = if store.is_object_storage() {
89        let object_store = if let Some(cache_layer) = build_cache_layer(&store, data_home).await? {
90            // Adds cache layer
91            object_store.layer(cache_layer)
92        } else {
93            object_store
94        };
95
96        // Adds retry layer
97        with_retry_layers(object_store)
98    } else {
99        object_store
100    };
101
102    let object_store = with_instrument_layers(object_store, true);
103    Ok(object_store)
104}
105
106async fn build_cache_layer(
107    store_config: &ObjectStoreConfig,
108    data_home: &str,
109) -> Result<Option<LruCacheLayer<impl Access>>> {
110    let (name, mut cache_path, cache_capacity) = match store_config {
111        ObjectStoreConfig::S3(s3_config) => {
112            let path = s3_config.cache.cache_path.clone();
113            let name = &s3_config.name;
114            let capacity = s3_config
115                .cache
116                .cache_capacity
117                .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
118            (name, path, capacity)
119        }
120        ObjectStoreConfig::Oss(oss_config) => {
121            let path = oss_config.cache.cache_path.clone();
122            let name = &oss_config.name;
123            let capacity = oss_config
124                .cache
125                .cache_capacity
126                .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
127            (name, path, capacity)
128        }
129        ObjectStoreConfig::Azblob(azblob_config) => {
130            let path = azblob_config.cache.cache_path.clone();
131            let name = &azblob_config.name;
132            let capacity = azblob_config
133                .cache
134                .cache_capacity
135                .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
136            (name, path, capacity)
137        }
138        ObjectStoreConfig::Gcs(gcs_config) => {
139            let path = gcs_config.cache.cache_path.clone();
140            let name = &gcs_config.name;
141            let capacity = gcs_config
142                .cache
143                .cache_capacity
144                .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
145            (name, path, capacity)
146        }
147        _ => unreachable!("Already checked above"),
148    };
149
150    // Enable object cache by default
151    // Set the cache_path to be `${data_home}` by default
152    // if it's not present
153    if cache_path.is_none() {
154        let read_cache_path = data_home.to_string();
155        tokio::fs::create_dir_all(Path::new(&read_cache_path))
156            .await
157            .context(CreateDirSnafu {
158                dir: &read_cache_path,
159            })?;
160
161        info!(
162            "The object storage cache path is not set for '{}', using the default path: '{}'",
163            name, &read_cache_path
164        );
165
166        cache_path = Some(read_cache_path);
167    }
168
169    if let Some(path) = cache_path.as_ref()
170        && !path.trim().is_empty()
171    {
172        let atomic_temp_dir = join_dir(path, ATOMIC_WRITE_DIR);
173        clean_temp_dir(&atomic_temp_dir)?;
174
175        // Compatible code. Remove this after a major release.
176        let old_atomic_temp_dir = join_dir(path, OLD_ATOMIC_WRITE_DIR);
177        clean_temp_dir(&old_atomic_temp_dir)?;
178
179        let cache_store = Fs::default()
180            .root(path)
181            .atomic_write_dir(&atomic_temp_dir)
182            .build()
183            .context(error::InitBackendSnafu)?;
184
185        let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
186            .context(error::InitBackendSnafu)?;
187        cache_layer.recover_cache(false).await;
188        info!(
189            "Enabled local object storage cache, path: {}, capacity: {}.",
190            path, cache_capacity
191        );
192
193        Ok(Some(cache_layer))
194    } else {
195        Ok(None)
196    }
197}
198
199pub(crate) fn clean_temp_dir(dir: &str) -> Result<()> {
200    if path::Path::new(&dir).exists() {
201        info!("Begin to clean temp storage directory: {}", dir);
202        std::fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?;
203        info!("Cleaned temp storage directory: {}", dir);
204    }
205
206    Ok(())
207}
208
209pub(crate) fn build_http_client(config: &HttpClientConfig) -> Result<HttpClient> {
210    let client = reqwest::ClientBuilder::new()
211        .pool_max_idle_per_host(config.pool_max_idle_per_host as usize)
212        .connect_timeout(config.connect_timeout)
213        .pool_idle_timeout(config.pool_idle_timeout)
214        .timeout(config.timeout)
215        .build()
216        .context(BuildHttpClientSnafu)?;
217    Ok(HttpClient::with(client))
218}
219struct PrintDetailedError;
220
221// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying.
222impl RetryInterceptor for PrintDetailedError {
223    fn intercept(&self, err: &Error, dur: Duration) {
224        warn!("Retry after {}s, error: {:#?}", dur.as_secs_f64(), err);
225    }
226}