datanode/
store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! object storage utilities
16
17use std::path::Path;
18use std::sync::Arc;
19
20use common_telemetry::info;
21use object_store::factory::new_raw_object_store;
22use object_store::layers::{LruCacheLayer, RetryLayer};
23use object_store::services::Fs;
24use object_store::util::{clean_temp_dir, join_dir, with_instrument_layers, PrintDetailedError};
25use object_store::{
26    Access, ObjectStore, ObjectStoreBuilder, ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR,
27};
28use snafu::prelude::*;
29
30use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
31use crate::error::{self, CreateDirSnafu, Result};
32
33fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
34    object_store.layer(
35        RetryLayer::new()
36            .with_jitter()
37            .with_notify(PrintDetailedError),
38    )
39}
40
41pub(crate) async fn new_object_store_without_cache(
42    store: &ObjectStoreConfig,
43    data_home: &str,
44) -> Result<ObjectStore> {
45    let object_store = new_raw_object_store(store, data_home)
46        .await
47        .context(error::ObjectStoreSnafu)?;
48    // Enable retry layer and cache layer for non-fs object storages
49    let object_store = if store.is_object_storage() {
50        // Adds retry layer
51        with_retry_layers(object_store)
52    } else {
53        object_store
54    };
55
56    let object_store = with_instrument_layers(object_store, true);
57    Ok(object_store)
58}
59
60pub(crate) async fn new_object_store(
61    store: ObjectStoreConfig,
62    data_home: &str,
63) -> Result<ObjectStore> {
64    let object_store = new_raw_object_store(&store, data_home)
65        .await
66        .context(error::ObjectStoreSnafu)?;
67    // Enable retry layer and cache layer for non-fs object storages
68    let object_store = if store.is_object_storage() {
69        let object_store = if let Some(cache_layer) = build_cache_layer(&store, data_home).await? {
70            // Adds cache layer
71            object_store.layer(cache_layer)
72        } else {
73            object_store
74        };
75
76        // Adds retry layer
77        with_retry_layers(object_store)
78    } else {
79        object_store
80    };
81
82    let object_store = with_instrument_layers(object_store, true);
83    Ok(object_store)
84}
85
86async fn build_cache_layer(
87    store_config: &ObjectStoreConfig,
88    data_home: &str,
89) -> Result<Option<LruCacheLayer<impl Access>>> {
90    let (name, mut cache_path, cache_capacity) = match store_config {
91        ObjectStoreConfig::S3(s3_config) => {
92            let path = s3_config.cache.cache_path.clone();
93            let name = &s3_config.name;
94            let capacity = s3_config
95                .cache
96                .cache_capacity
97                .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
98            (name, path, capacity)
99        }
100        ObjectStoreConfig::Oss(oss_config) => {
101            let path = oss_config.cache.cache_path.clone();
102            let name = &oss_config.name;
103            let capacity = oss_config
104                .cache
105                .cache_capacity
106                .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
107            (name, path, capacity)
108        }
109        ObjectStoreConfig::Azblob(azblob_config) => {
110            let path = azblob_config.cache.cache_path.clone();
111            let name = &azblob_config.name;
112            let capacity = azblob_config
113                .cache
114                .cache_capacity
115                .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
116            (name, path, capacity)
117        }
118        ObjectStoreConfig::Gcs(gcs_config) => {
119            let path = gcs_config.cache.cache_path.clone();
120            let name = &gcs_config.name;
121            let capacity = gcs_config
122                .cache
123                .cache_capacity
124                .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
125            (name, path, capacity)
126        }
127        _ => unreachable!("Already checked above"),
128    };
129
130    // Enable object cache by default
131    // Set the cache_path to be `${data_home}` by default
132    // if it's not present
133    if cache_path.is_none() {
134        let read_cache_path = data_home.to_string();
135        tokio::fs::create_dir_all(Path::new(&read_cache_path))
136            .await
137            .context(CreateDirSnafu {
138                dir: &read_cache_path,
139            })?;
140
141        info!(
142            "The object storage cache path is not set for '{}', using the default path: '{}'",
143            name, &read_cache_path
144        );
145
146        cache_path = Some(read_cache_path);
147    }
148
149    if let Some(path) = cache_path.as_ref()
150        && !path.trim().is_empty()
151    {
152        let atomic_temp_dir = join_dir(path, ATOMIC_WRITE_DIR);
153        clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?;
154
155        // Compatible code. Remove this after a major release.
156        let old_atomic_temp_dir = join_dir(path, OLD_ATOMIC_WRITE_DIR);
157        clean_temp_dir(&old_atomic_temp_dir).context(error::ObjectStoreSnafu)?;
158
159        let cache_store = Fs::default()
160            .root(path)
161            .atomic_write_dir(&atomic_temp_dir)
162            .build()
163            .context(error::BuildCacheStoreSnafu)?;
164
165        let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
166            .context(error::BuildCacheStoreSnafu)?;
167        cache_layer.recover_cache(false).await;
168        info!(
169            "Enabled local object storage cache, path: {}, capacity: {}.",
170            path, cache_capacity
171        );
172
173        Ok(Some(cache_layer))
174    } else {
175        Ok(None)
176    }
177}