1mod azblob;
18pub mod fs;
19mod gcs;
20mod oss;
21mod s3;
22use std::path;
23use std::path::Path;
24use std::sync::Arc;
25use std::time::Duration;
26
27use common_telemetry::{info, warn};
28use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
29use object_store::services::Fs;
30use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
31use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder};
32use snafu::prelude::*;
33
34use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
35use crate::error::{self, BuildHttpClientSnafu, CreateDirSnafu, Result};
36
37pub(crate) async fn new_raw_object_store(
38 store: &ObjectStoreConfig,
39 data_home: &str,
40) -> Result<ObjectStore> {
41 let data_home = normalize_dir(data_home);
42 let object_store = match store {
43 ObjectStoreConfig::File(file_config) => {
44 fs::new_fs_object_store(&data_home, file_config).await
45 }
46 ObjectStoreConfig::S3(s3_config) => s3::new_s3_object_store(s3_config).await,
47 ObjectStoreConfig::Oss(oss_config) => oss::new_oss_object_store(oss_config).await,
48 ObjectStoreConfig::Azblob(azblob_config) => {
49 azblob::new_azblob_object_store(azblob_config).await
50 }
51 ObjectStoreConfig::Gcs(gcs_config) => gcs::new_gcs_object_store(gcs_config).await,
52 }?;
53 Ok(object_store)
54}
55
56fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
57 object_store.layer(
58 RetryLayer::new()
59 .with_jitter()
60 .with_notify(PrintDetailedError),
61 )
62}
63
64pub(crate) async fn new_object_store_without_cache(
65 store: &ObjectStoreConfig,
66 data_home: &str,
67) -> Result<ObjectStore> {
68 let object_store = new_raw_object_store(store, data_home).await?;
69 let object_store = if store.is_object_storage() {
71 with_retry_layers(object_store)
73 } else {
74 object_store
75 };
76
77 let object_store = with_instrument_layers(object_store, true);
78 Ok(object_store)
79}
80
81pub(crate) async fn new_object_store(
82 store: ObjectStoreConfig,
83 data_home: &str,
84) -> Result<ObjectStore> {
85 let object_store = new_raw_object_store(&store, data_home).await?;
86 let object_store = if store.is_object_storage() {
88 let object_store = if let Some(cache_layer) = build_cache_layer(&store, data_home).await? {
89 object_store.layer(cache_layer)
91 } else {
92 object_store
93 };
94
95 with_retry_layers(object_store)
97 } else {
98 object_store
99 };
100
101 let object_store = with_instrument_layers(object_store, true);
102 Ok(object_store)
103}
104
105async fn build_cache_layer(
106 store_config: &ObjectStoreConfig,
107 data_home: &str,
108) -> Result<Option<LruCacheLayer<impl Access>>> {
109 let (name, mut cache_path, cache_capacity) = match store_config {
110 ObjectStoreConfig::S3(s3_config) => {
111 let path = s3_config.cache.cache_path.clone();
112 let name = &s3_config.name;
113 let capacity = s3_config
114 .cache
115 .cache_capacity
116 .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
117 (name, path, capacity)
118 }
119 ObjectStoreConfig::Oss(oss_config) => {
120 let path = oss_config.cache.cache_path.clone();
121 let name = &oss_config.name;
122 let capacity = oss_config
123 .cache
124 .cache_capacity
125 .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
126 (name, path, capacity)
127 }
128 ObjectStoreConfig::Azblob(azblob_config) => {
129 let path = azblob_config.cache.cache_path.clone();
130 let name = &azblob_config.name;
131 let capacity = azblob_config
132 .cache
133 .cache_capacity
134 .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
135 (name, path, capacity)
136 }
137 ObjectStoreConfig::Gcs(gcs_config) => {
138 let path = gcs_config.cache.cache_path.clone();
139 let name = &gcs_config.name;
140 let capacity = gcs_config
141 .cache
142 .cache_capacity
143 .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
144 (name, path, capacity)
145 }
146 _ => unreachable!("Already checked above"),
147 };
148
149 if cache_path.is_none() {
153 let read_cache_path = data_home.to_string();
154 tokio::fs::create_dir_all(Path::new(&read_cache_path))
155 .await
156 .context(CreateDirSnafu {
157 dir: &read_cache_path,
158 })?;
159
160 info!(
161 "The object storage cache path is not set for '{}', using the default path: '{}'",
162 name, &read_cache_path
163 );
164
165 cache_path = Some(read_cache_path);
166 }
167
168 if let Some(path) = cache_path.as_ref()
169 && !path.trim().is_empty()
170 {
171 let atomic_temp_dir = join_dir(path, ".tmp/");
172 clean_temp_dir(&atomic_temp_dir)?;
173
174 let cache_store = Fs::default()
175 .root(path)
176 .atomic_write_dir(&atomic_temp_dir)
177 .build()
178 .context(error::InitBackendSnafu)?;
179
180 let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
181 .context(error::InitBackendSnafu)?;
182 cache_layer.recover_cache(false).await;
183 info!(
184 "Enabled local object storage cache, path: {}, capacity: {}.",
185 path, cache_capacity
186 );
187
188 Ok(Some(cache_layer))
189 } else {
190 Ok(None)
191 }
192}
193
194pub(crate) fn clean_temp_dir(dir: &str) -> Result<()> {
195 if path::Path::new(&dir).exists() {
196 info!("Begin to clean temp storage directory: {}", dir);
197 std::fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?;
198 info!("Cleaned temp storage directory: {}", dir);
199 }
200
201 Ok(())
202}
203
204pub(crate) fn build_http_client(config: &HttpClientConfig) -> Result<HttpClient> {
205 let client = reqwest::ClientBuilder::new()
206 .pool_max_idle_per_host(config.pool_max_idle_per_host as usize)
207 .connect_timeout(config.connect_timeout)
208 .pool_idle_timeout(config.pool_idle_timeout)
209 .timeout(config.timeout)
210 .build()
211 .context(BuildHttpClientSnafu)?;
212 Ok(HttpClient::with(client))
213}
214struct PrintDetailedError;
215
216impl RetryInterceptor for PrintDetailedError {
218 fn intercept(&self, err: &Error, dur: Duration) {
219 warn!("Retry after {}s, error: {:#?}", dur.as_secs_f64(), err);
220 }
221}