1mod azblob;
18pub mod fs;
19mod gcs;
20mod oss;
21mod s3;
22use std::path;
23use std::path::Path;
24use std::sync::Arc;
25use std::time::Duration;
26
27use common_telemetry::{info, warn};
28use mito2::access_layer::{ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR};
29use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
30use object_store::services::Fs;
31use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
32use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder};
33use snafu::prelude::*;
34
35use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
36use crate::error::{self, BuildHttpClientSnafu, CreateDirSnafu, Result};
37
38pub(crate) async fn new_raw_object_store(
39 store: &ObjectStoreConfig,
40 data_home: &str,
41) -> Result<ObjectStore> {
42 let data_home = normalize_dir(data_home);
43 let object_store = match store {
44 ObjectStoreConfig::File(file_config) => {
45 fs::new_fs_object_store(&data_home, file_config).await
46 }
47 ObjectStoreConfig::S3(s3_config) => s3::new_s3_object_store(s3_config).await,
48 ObjectStoreConfig::Oss(oss_config) => oss::new_oss_object_store(oss_config).await,
49 ObjectStoreConfig::Azblob(azblob_config) => {
50 azblob::new_azblob_object_store(azblob_config).await
51 }
52 ObjectStoreConfig::Gcs(gcs_config) => gcs::new_gcs_object_store(gcs_config).await,
53 }?;
54 Ok(object_store)
55}
56
57fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
58 object_store.layer(
59 RetryLayer::new()
60 .with_jitter()
61 .with_notify(PrintDetailedError),
62 )
63}
64
65pub(crate) async fn new_object_store_without_cache(
66 store: &ObjectStoreConfig,
67 data_home: &str,
68) -> Result<ObjectStore> {
69 let object_store = new_raw_object_store(store, data_home).await?;
70 let object_store = if store.is_object_storage() {
72 with_retry_layers(object_store)
74 } else {
75 object_store
76 };
77
78 let object_store = with_instrument_layers(object_store, true);
79 Ok(object_store)
80}
81
82pub(crate) async fn new_object_store(
83 store: ObjectStoreConfig,
84 data_home: &str,
85) -> Result<ObjectStore> {
86 let object_store = new_raw_object_store(&store, data_home).await?;
87 let object_store = if store.is_object_storage() {
89 let object_store = if let Some(cache_layer) = build_cache_layer(&store, data_home).await? {
90 object_store.layer(cache_layer)
92 } else {
93 object_store
94 };
95
96 with_retry_layers(object_store)
98 } else {
99 object_store
100 };
101
102 let object_store = with_instrument_layers(object_store, true);
103 Ok(object_store)
104}
105
106async fn build_cache_layer(
107 store_config: &ObjectStoreConfig,
108 data_home: &str,
109) -> Result<Option<LruCacheLayer<impl Access>>> {
110 let (name, mut cache_path, cache_capacity) = match store_config {
111 ObjectStoreConfig::S3(s3_config) => {
112 let path = s3_config.cache.cache_path.clone();
113 let name = &s3_config.name;
114 let capacity = s3_config
115 .cache
116 .cache_capacity
117 .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
118 (name, path, capacity)
119 }
120 ObjectStoreConfig::Oss(oss_config) => {
121 let path = oss_config.cache.cache_path.clone();
122 let name = &oss_config.name;
123 let capacity = oss_config
124 .cache
125 .cache_capacity
126 .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
127 (name, path, capacity)
128 }
129 ObjectStoreConfig::Azblob(azblob_config) => {
130 let path = azblob_config.cache.cache_path.clone();
131 let name = &azblob_config.name;
132 let capacity = azblob_config
133 .cache
134 .cache_capacity
135 .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
136 (name, path, capacity)
137 }
138 ObjectStoreConfig::Gcs(gcs_config) => {
139 let path = gcs_config.cache.cache_path.clone();
140 let name = &gcs_config.name;
141 let capacity = gcs_config
142 .cache
143 .cache_capacity
144 .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
145 (name, path, capacity)
146 }
147 _ => unreachable!("Already checked above"),
148 };
149
150 if cache_path.is_none() {
154 let read_cache_path = data_home.to_string();
155 tokio::fs::create_dir_all(Path::new(&read_cache_path))
156 .await
157 .context(CreateDirSnafu {
158 dir: &read_cache_path,
159 })?;
160
161 info!(
162 "The object storage cache path is not set for '{}', using the default path: '{}'",
163 name, &read_cache_path
164 );
165
166 cache_path = Some(read_cache_path);
167 }
168
169 if let Some(path) = cache_path.as_ref()
170 && !path.trim().is_empty()
171 {
172 let atomic_temp_dir = join_dir(path, ATOMIC_WRITE_DIR);
173 clean_temp_dir(&atomic_temp_dir)?;
174
175 let old_atomic_temp_dir = join_dir(path, OLD_ATOMIC_WRITE_DIR);
177 clean_temp_dir(&old_atomic_temp_dir)?;
178
179 let cache_store = Fs::default()
180 .root(path)
181 .atomic_write_dir(&atomic_temp_dir)
182 .build()
183 .context(error::InitBackendSnafu)?;
184
185 let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
186 .context(error::InitBackendSnafu)?;
187 cache_layer.recover_cache(false).await;
188 info!(
189 "Enabled local object storage cache, path: {}, capacity: {}.",
190 path, cache_capacity
191 );
192
193 Ok(Some(cache_layer))
194 } else {
195 Ok(None)
196 }
197}
198
199pub(crate) fn clean_temp_dir(dir: &str) -> Result<()> {
200 if path::Path::new(&dir).exists() {
201 info!("Begin to clean temp storage directory: {}", dir);
202 std::fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?;
203 info!("Cleaned temp storage directory: {}", dir);
204 }
205
206 Ok(())
207}
208
209pub(crate) fn build_http_client(config: &HttpClientConfig) -> Result<HttpClient> {
210 let client = reqwest::ClientBuilder::new()
211 .pool_max_idle_per_host(config.pool_max_idle_per_host as usize)
212 .connect_timeout(config.connect_timeout)
213 .pool_idle_timeout(config.pool_idle_timeout)
214 .timeout(config.timeout)
215 .build()
216 .context(BuildHttpClientSnafu)?;
217 Ok(HttpClient::with(client))
218}
219struct PrintDetailedError;
220
221impl RetryInterceptor for PrintDetailedError {
223 fn intercept(&self, err: &Error, dur: Duration) {
224 warn!("Retry after {}s, error: {:#?}", dur.as_secs_f64(), err);
225 }
226}