1use std::path::Path;
18use std::sync::Arc;
19use std::time::Duration;
20
21use common_telemetry::{info, warn};
22use object_store::factory::new_raw_object_store;
23use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
24use object_store::services::Fs;
25use object_store::util::{clean_temp_dir, join_dir, with_instrument_layers};
26use object_store::{
27 Access, Error, ObjectStore, ObjectStoreBuilder, ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR,
28};
29use snafu::prelude::*;
30
31use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
32use crate::error::{self, CreateDirSnafu, Result};
33
34fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
35 object_store.layer(
36 RetryLayer::new()
37 .with_jitter()
38 .with_notify(PrintDetailedError),
39 )
40}
41
42pub(crate) async fn new_object_store_without_cache(
43 store: &ObjectStoreConfig,
44 data_home: &str,
45) -> Result<ObjectStore> {
46 let object_store = new_raw_object_store(store, data_home)
47 .await
48 .context(error::ObjectStoreSnafu)?;
49 let object_store = if store.is_object_storage() {
51 with_retry_layers(object_store)
53 } else {
54 object_store
55 };
56
57 let object_store = with_instrument_layers(object_store, true);
58 Ok(object_store)
59}
60
61pub(crate) async fn new_object_store(
62 store: ObjectStoreConfig,
63 data_home: &str,
64) -> Result<ObjectStore> {
65 let object_store = new_raw_object_store(&store, data_home)
66 .await
67 .context(error::ObjectStoreSnafu)?;
68 let object_store = if store.is_object_storage() {
70 let object_store = if let Some(cache_layer) = build_cache_layer(&store, data_home).await? {
71 object_store.layer(cache_layer)
73 } else {
74 object_store
75 };
76
77 with_retry_layers(object_store)
79 } else {
80 object_store
81 };
82
83 let object_store = with_instrument_layers(object_store, true);
84 Ok(object_store)
85}
86
87async fn build_cache_layer(
88 store_config: &ObjectStoreConfig,
89 data_home: &str,
90) -> Result<Option<LruCacheLayer<impl Access>>> {
91 let (name, mut cache_path, cache_capacity) = match store_config {
92 ObjectStoreConfig::S3(s3_config) => {
93 let path = s3_config.cache.cache_path.clone();
94 let name = &s3_config.name;
95 let capacity = s3_config
96 .cache
97 .cache_capacity
98 .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
99 (name, path, capacity)
100 }
101 ObjectStoreConfig::Oss(oss_config) => {
102 let path = oss_config.cache.cache_path.clone();
103 let name = &oss_config.name;
104 let capacity = oss_config
105 .cache
106 .cache_capacity
107 .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
108 (name, path, capacity)
109 }
110 ObjectStoreConfig::Azblob(azblob_config) => {
111 let path = azblob_config.cache.cache_path.clone();
112 let name = &azblob_config.name;
113 let capacity = azblob_config
114 .cache
115 .cache_capacity
116 .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
117 (name, path, capacity)
118 }
119 ObjectStoreConfig::Gcs(gcs_config) => {
120 let path = gcs_config.cache.cache_path.clone();
121 let name = &gcs_config.name;
122 let capacity = gcs_config
123 .cache
124 .cache_capacity
125 .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
126 (name, path, capacity)
127 }
128 _ => unreachable!("Already checked above"),
129 };
130
131 if cache_path.is_none() {
135 let read_cache_path = data_home.to_string();
136 tokio::fs::create_dir_all(Path::new(&read_cache_path))
137 .await
138 .context(CreateDirSnafu {
139 dir: &read_cache_path,
140 })?;
141
142 info!(
143 "The object storage cache path is not set for '{}', using the default path: '{}'",
144 name, &read_cache_path
145 );
146
147 cache_path = Some(read_cache_path);
148 }
149
150 if let Some(path) = cache_path.as_ref()
151 && !path.trim().is_empty()
152 {
153 let atomic_temp_dir = join_dir(path, ATOMIC_WRITE_DIR);
154 clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?;
155
156 let old_atomic_temp_dir = join_dir(path, OLD_ATOMIC_WRITE_DIR);
158 clean_temp_dir(&old_atomic_temp_dir).context(error::ObjectStoreSnafu)?;
159
160 let cache_store = Fs::default()
161 .root(path)
162 .atomic_write_dir(&atomic_temp_dir)
163 .build()
164 .context(error::BuildCacheStoreSnafu)?;
165
166 let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
167 .context(error::BuildCacheStoreSnafu)?;
168 cache_layer.recover_cache(false).await;
169 info!(
170 "Enabled local object storage cache, path: {}, capacity: {}.",
171 path, cache_capacity
172 );
173
174 Ok(Some(cache_layer))
175 } else {
176 Ok(None)
177 }
178}
179
180struct PrintDetailedError;
181
182impl RetryInterceptor for PrintDetailedError {
184 fn intercept(&self, err: &Error, dur: Duration) {
185 warn!("Retry after {}s, error: {:#?}", dur.as_secs_f64(), err);
186 }
187}