1use std::fmt::Display;
16use std::path;
17use std::time::Duration;
18
19use common_telemetry::{debug, error, info, warn};
20use opendal::layers::{
21 LoggingInterceptor, LoggingLayer, RetryInterceptor, RetryLayer, TracingLayer,
22};
23use opendal::raw::{AccessorInfo, HttpClient, Operation};
24use opendal::{Error, ErrorKind};
25use snafu::ResultExt;
26
27use crate::config::HttpClientConfig;
28use crate::{ObjectStore, error};
29
30pub fn join_dir(parent: &str, child: &str) -> String {
38 let output = format!("{parent}/{child}/");
40 normalize_dir(&output)
41}
42
43pub fn normalize_dir(v: &str) -> String {
63 let has_root = v.starts_with('/');
64 let mut v = v
65 .split('/')
66 .filter(|v| !v.is_empty())
67 .collect::<Vec<&str>>()
68 .join("/");
69 if has_root {
70 v.insert(0, '/');
71 }
72 if !v.ends_with('/') {
73 v.push('/')
74 }
75 v
76}
77
78pub fn join_path(parent: &str, child: &str) -> String {
83 let output = format!("{parent}/{child}");
84 normalize_path(&output)
85}
86
87pub fn normalize_path(path: &str) -> String {
99 let path = path.trim();
101
102 if path.is_empty() {
104 return "/".to_string();
105 }
106
107 let has_leading = path.starts_with('/');
108 let has_trailing = path.ends_with('/');
109
110 let mut p = path
111 .split('/')
112 .filter(|v| !v.is_empty())
113 .collect::<Vec<_>>()
114 .join("/");
115
116 if !p.starts_with('/') && has_leading {
118 p.insert(0, '/');
119 }
120
121 if !p.ends_with('/') && has_trailing {
123 p.push('/');
124 }
125
126 p
127}
128
129pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore {
131 object_store
132 .layer(LoggingLayer::new(DefaultLoggingInterceptor))
133 .layer(TracingLayer)
134 .layer(crate::layers::build_prometheus_metrics_layer(path_label))
135}
136
137pub fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
139 object_store.layer(
140 RetryLayer::new()
141 .with_jitter()
142 .with_notify(PrintDetailedError),
143 )
144}
145
146static LOGGING_TARGET: &str = "opendal::services";
147
148struct LoggingContext<'a>(&'a [(&'a str, &'a str)]);
149
150impl Display for LoggingContext<'_> {
151 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152 for (i, (k, v)) in self.0.iter().enumerate() {
153 if i > 0 {
154 write!(f, " {}={}", k, v)?;
155 } else {
156 write!(f, "{}={}", k, v)?;
157 }
158 }
159 Ok(())
160 }
161}
162
163#[derive(Debug, Copy, Clone, Default)]
164pub struct DefaultLoggingInterceptor;
165
166impl LoggingInterceptor for DefaultLoggingInterceptor {
167 #[inline]
168 fn log(
169 &self,
170 info: &AccessorInfo,
171 operation: Operation,
172 context: &[(&str, &str)],
173 message: &str,
174 err: Option<&opendal::Error>,
175 ) {
176 if let Some(err) = err {
177 if err.kind() == ErrorKind::Unexpected {
179 error!(
180 target: LOGGING_TARGET,
181 "service={} name={} {}: {operation} {message} {err:#?}",
182 info.scheme(),
183 info.name(),
184 LoggingContext(context),
185 );
186 } else {
187 debug!(
188 target: LOGGING_TARGET,
189 "service={} name={} {}: {operation} {message} {err}",
190 info.scheme(),
191 info.name(),
192 LoggingContext(context),
193 );
194 };
195 }
196
197 debug!(
198 target: LOGGING_TARGET,
199 "service={} name={} {}: {operation} {message}",
200 info.scheme(),
201 info.name(),
202 LoggingContext(context),
203 );
204 }
205}
206
207pub(crate) fn build_http_client(config: &HttpClientConfig) -> error::Result<HttpClient> {
208 if config.skip_ssl_validation {
209 common_telemetry::warn!(
210 "Skipping SSL validation for object storage HTTP client. Please ensure the environment is trusted."
211 );
212 }
213
214 let client = reqwest::ClientBuilder::new()
215 .pool_max_idle_per_host(config.pool_max_idle_per_host as usize)
216 .connect_timeout(config.connect_timeout)
217 .pool_idle_timeout(config.pool_idle_timeout)
218 .timeout(config.timeout)
219 .danger_accept_invalid_certs(config.skip_ssl_validation)
220 .build()
221 .context(error::BuildHttpClientSnafu)?;
222 Ok(HttpClient::with(client))
223}
224
225pub fn clean_temp_dir(dir: &str) -> error::Result<()> {
226 if path::Path::new(&dir).exists() {
227 info!("Begin to clean temp storage directory: {}", dir);
228 std::fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?;
229 info!("Cleaned temp storage directory: {}", dir);
230 }
231
232 Ok(())
233}
234
235pub struct PrintDetailedError;
237
238impl RetryInterceptor for PrintDetailedError {
240 fn intercept(&self, err: &Error, dur: Duration) {
241 warn!("Retry after {}s, error: {:#?}", dur.as_secs_f64(), err);
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248
249 #[test]
250 fn test_normalize_dir() {
251 assert_eq!("/", normalize_dir("/"));
252 assert_eq!("/", normalize_dir(""));
253 assert_eq!("/test/", normalize_dir("/test"));
254 }
255
256 #[test]
257 fn test_join_dir() {
258 assert_eq!("/", join_dir("", ""));
259 assert_eq!("/", join_dir("/", ""));
260 assert_eq!("/", join_dir("", "/"));
261 assert_eq!("/", join_dir("/", "/"));
262 assert_eq!("/a/", join_dir("/a", ""));
263 assert_eq!("a/b/c/", join_dir("a/b", "c"));
264 assert_eq!("/a/b/c/", join_dir("/a/b", "c"));
265 assert_eq!("/a/b/c/", join_dir("/a/b", "c/"));
266 assert_eq!("/a/b/c/", join_dir("/a/b", "/c/"));
267 assert_eq!("/a/b/c/", join_dir("/a/b", "//c"));
268 }
269
270 #[test]
271 fn test_join_path() {
272 assert_eq!("/", join_path("", ""));
273 assert_eq!("/", join_path("/", ""));
274 assert_eq!("/", join_path("", "/"));
275 assert_eq!("/", join_path("/", "/"));
276 assert_eq!("a/", join_path("a", ""));
277 assert_eq!("/a", join_path("/", "a"));
278 assert_eq!("a/b/c.txt", join_path("a/b", "c.txt"));
279 assert_eq!("/a/b/c.txt", join_path("/a/b", "c.txt"));
280 assert_eq!("/a/b/c/", join_path("/a/b", "c/"));
281 assert_eq!("/a/b/c/", join_path("/a/b", "/c/"));
282 assert_eq!("/a/b/c.txt", join_path("/a/b", "//c.txt"));
283 assert_eq!("abc/def", join_path(" abc", "/def "));
284 assert_eq!("/abc", join_path("//", "/abc"));
285 assert_eq!("abc/def", join_path("abc/", "//def"));
286 }
287}