1use std::path::Path;
18use std::sync::Arc;
19
20use common_telemetry::info;
21use object_store::factory::new_raw_object_store;
22use object_store::layers::{LruCacheLayer, RetryLayer};
23use object_store::services::Fs;
24use object_store::util::{clean_temp_dir, join_dir, with_instrument_layers, PrintDetailedError};
25use object_store::{
26 Access, ObjectStore, ObjectStoreBuilder, ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR,
27};
28use snafu::prelude::*;
29
30use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
31use crate::error::{self, CreateDirSnafu, Result};
32
33fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
34 object_store.layer(
35 RetryLayer::new()
36 .with_jitter()
37 .with_notify(PrintDetailedError),
38 )
39}
40
41pub(crate) async fn new_object_store_without_cache(
42 store: &ObjectStoreConfig,
43 data_home: &str,
44) -> Result<ObjectStore> {
45 let object_store = new_raw_object_store(store, data_home)
46 .await
47 .context(error::ObjectStoreSnafu)?;
48 let object_store = if store.is_object_storage() {
50 with_retry_layers(object_store)
52 } else {
53 object_store
54 };
55
56 let object_store = with_instrument_layers(object_store, true);
57 Ok(object_store)
58}
59
60pub(crate) async fn new_object_store(
61 store: ObjectStoreConfig,
62 data_home: &str,
63) -> Result<ObjectStore> {
64 let object_store = new_raw_object_store(&store, data_home)
65 .await
66 .context(error::ObjectStoreSnafu)?;
67 let object_store = if store.is_object_storage() {
69 let object_store = if let Some(cache_layer) = build_cache_layer(&store, data_home).await? {
70 object_store.layer(cache_layer)
72 } else {
73 object_store
74 };
75
76 with_retry_layers(object_store)
78 } else {
79 object_store
80 };
81
82 let object_store = with_instrument_layers(object_store, true);
83 Ok(object_store)
84}
85
86async fn build_cache_layer(
87 store_config: &ObjectStoreConfig,
88 data_home: &str,
89) -> Result<Option<LruCacheLayer<impl Access>>> {
90 let (name, mut cache_path, cache_capacity) = match store_config {
91 ObjectStoreConfig::S3(s3_config) => {
92 let path = s3_config.cache.cache_path.clone();
93 let name = &s3_config.name;
94 let capacity = s3_config
95 .cache
96 .cache_capacity
97 .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
98 (name, path, capacity)
99 }
100 ObjectStoreConfig::Oss(oss_config) => {
101 let path = oss_config.cache.cache_path.clone();
102 let name = &oss_config.name;
103 let capacity = oss_config
104 .cache
105 .cache_capacity
106 .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
107 (name, path, capacity)
108 }
109 ObjectStoreConfig::Azblob(azblob_config) => {
110 let path = azblob_config.cache.cache_path.clone();
111 let name = &azblob_config.name;
112 let capacity = azblob_config
113 .cache
114 .cache_capacity
115 .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
116 (name, path, capacity)
117 }
118 ObjectStoreConfig::Gcs(gcs_config) => {
119 let path = gcs_config.cache.cache_path.clone();
120 let name = &gcs_config.name;
121 let capacity = gcs_config
122 .cache
123 .cache_capacity
124 .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
125 (name, path, capacity)
126 }
127 _ => unreachable!("Already checked above"),
128 };
129
130 if cache_path.is_none() {
134 let read_cache_path = data_home.to_string();
135 tokio::fs::create_dir_all(Path::new(&read_cache_path))
136 .await
137 .context(CreateDirSnafu {
138 dir: &read_cache_path,
139 })?;
140
141 info!(
142 "The object storage cache path is not set for '{}', using the default path: '{}'",
143 name, &read_cache_path
144 );
145
146 cache_path = Some(read_cache_path);
147 }
148
149 if let Some(path) = cache_path.as_ref()
150 && !path.trim().is_empty()
151 {
152 let atomic_temp_dir = join_dir(path, ATOMIC_WRITE_DIR);
153 clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?;
154
155 let old_atomic_temp_dir = join_dir(path, OLD_ATOMIC_WRITE_DIR);
157 clean_temp_dir(&old_atomic_temp_dir).context(error::ObjectStoreSnafu)?;
158
159 let cache_store = Fs::default()
160 .root(path)
161 .atomic_write_dir(&atomic_temp_dir)
162 .build()
163 .context(error::BuildCacheStoreSnafu)?;
164
165 let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
166 .context(error::BuildCacheStoreSnafu)?;
167 cache_layer.recover_cache(false).await;
168 info!(
169 "Enabled local object storage cache, path: {}, capacity: {}.",
170 path, cache_capacity
171 );
172
173 Ok(Some(cache_layer))
174 } else {
175 Ok(None)
176 }
177}