datanode/
store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! object storage utilities
16
17use common_telemetry::{info, warn};
18use object_store::factory::new_raw_object_store;
19use object_store::util::{clean_temp_dir, join_dir, with_instrument_layers, with_retry_layers};
20use object_store::{ATOMIC_WRITE_DIR, ObjectStore};
21use snafu::prelude::*;
22
23use crate::config::ObjectStoreConfig;
24use crate::error::{self, Result};
25
26pub(crate) async fn new_object_store_without_cache(
27    store: &ObjectStoreConfig,
28    data_home: &str,
29) -> Result<ObjectStore> {
30    let object_store = new_raw_object_store(store, data_home)
31        .await
32        .context(error::ObjectStoreSnafu)?;
33    // Enable retry layer and cache layer for non-fs object storages
34    let object_store = if store.is_object_storage() {
35        // Adds retry layer
36        with_retry_layers(object_store)
37    } else {
38        object_store
39    };
40
41    let object_store = with_instrument_layers(object_store, true);
42    Ok(object_store)
43}
44
45/// Cleans up old LRU read cache directories that were removed.
46fn clean_old_read_cache(store: &ObjectStoreConfig, data_home: &str) {
47    if !store.is_object_storage() {
48        return;
49    }
50
51    let Some(cache_config) = store.cache_config() else {
52        return;
53    };
54
55    // Only cleans if read cache was enabled
56    if !cache_config.enable_read_cache {
57        return;
58    }
59
60    let cache_base_dir = if cache_config.cache_path.is_empty() {
61        data_home
62    } else {
63        &cache_config.cache_path
64    };
65
66    // Cleans up the old read cache directory
67    let old_read_cache_dir = join_dir(cache_base_dir, "cache/object/read");
68    info!(
69        "Cleaning up old read cache directory: {}",
70        old_read_cache_dir
71    );
72    if let Err(e) = clean_temp_dir(&old_read_cache_dir) {
73        warn!(e; "Failed to clean old read cache directory {}", old_read_cache_dir);
74    }
75
76    // Cleans up the atomic temp dir used by the cache layer
77    let cache_atomic_temp_dir = join_dir(cache_base_dir, ATOMIC_WRITE_DIR);
78    info!(
79        "Cleaning up old cache atomic temp directory: {}",
80        cache_atomic_temp_dir
81    );
82    if let Err(e) = clean_temp_dir(&cache_atomic_temp_dir) {
83        warn!(e; "Failed to clean old cache atomic temp directory {}", cache_atomic_temp_dir);
84    }
85}
86
87pub async fn new_object_store(store: ObjectStoreConfig, data_home: &str) -> Result<ObjectStore> {
88    // Cleans up old LRU read cache directories.
89    // TODO: Remove this line after the 1.0 release.
90    clean_old_read_cache(&store, data_home);
91
92    let object_store = new_raw_object_store(&store, data_home)
93        .await
94        .context(error::ObjectStoreSnafu)?;
95    // Enables retry layer for non-fs object storages
96    let object_store = if store.is_object_storage() {
97        // Adds retry layer
98        with_retry_layers(object_store)
99    } else {
100        object_store
101    };
102
103    let object_store = with_instrument_layers(object_store, true);
104    Ok(object_store)
105}