1use std::fmt::Display;
16use std::path;
17use std::time::Duration;
18
19use common_error::root_source;
20use common_telemetry::{debug, error, info, warn};
21use opendal::layers::{
22 LoggingInterceptor, LoggingLayer, RetryInterceptor, RetryLayer, TracingLayer,
23};
24use opendal::raw::{AccessorInfo, HttpClient, Operation};
25use opendal::{Error, ErrorKind};
26use snafu::ResultExt;
27
28use crate::config::HttpClientConfig;
29use crate::{ObjectStore, error};
30
31pub fn join_dir(parent: &str, child: &str) -> String {
39 let output = format!("{parent}/{child}/");
41 normalize_dir(&output)
42}
43
44pub fn normalize_dir(v: &str) -> String {
64 let has_root = v.starts_with('/');
65 let mut v = v
66 .split('/')
67 .filter(|v| !v.is_empty())
68 .collect::<Vec<&str>>()
69 .join("/");
70 if has_root {
71 v.insert(0, '/');
72 }
73 if !v.ends_with('/') {
74 v.push('/')
75 }
76 v
77}
78
79pub fn join_path(parent: &str, child: &str) -> String {
84 let output = format!("{parent}/{child}");
85 normalize_path(&output)
86}
87
88pub fn normalize_path(path: &str) -> String {
100 let path = path.trim();
102
103 if path.is_empty() {
105 return "/".to_string();
106 }
107
108 let has_leading = path.starts_with('/');
109 let has_trailing = path.ends_with('/');
110
111 let mut p = path
112 .split('/')
113 .filter(|v| !v.is_empty())
114 .collect::<Vec<_>>()
115 .join("/");
116
117 if !p.starts_with('/') && has_leading {
119 p.insert(0, '/');
120 }
121
122 if !p.ends_with('/') && has_trailing {
124 p.push('/');
125 }
126
127 p
128}
129
130pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore {
132 object_store
133 .layer(LoggingLayer::new(DefaultLoggingInterceptor))
134 .layer(TracingLayer)
135 .layer(crate::layers::build_prometheus_metrics_layer(path_label))
136}
137
138pub fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
140 object_store.layer(
141 RetryLayer::new()
142 .with_jitter()
143 .with_notify(PrintDetailedError),
144 )
145}
146
147static LOGGING_TARGET: &str = "opendal::services";
148
149struct LoggingContext<'a>(&'a [(&'a str, &'a str)]);
150
151impl Display for LoggingContext<'_> {
152 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153 for (i, (k, v)) in self.0.iter().enumerate() {
154 if i > 0 {
155 write!(f, " {}={}", k, v)?;
156 } else {
157 write!(f, "{}={}", k, v)?;
158 }
159 }
160 Ok(())
161 }
162}
163
164#[derive(Debug, Copy, Clone, Default)]
165pub struct DefaultLoggingInterceptor;
166
167impl LoggingInterceptor for DefaultLoggingInterceptor {
168 #[inline]
169 fn log(
170 &self,
171 info: &AccessorInfo,
172 operation: Operation,
173 context: &[(&str, &str)],
174 message: &str,
175 err: Option<&opendal::Error>,
176 ) {
177 if let Some(err) = err {
178 let root = root_source(err);
179 if err.kind() == ErrorKind::Unexpected {
181 error!(
182 target: LOGGING_TARGET,
183 "service={} name={} {}: {operation} {message} {err:#?}, root={root:#?}",
184 info.scheme(),
185 info.name(),
186 LoggingContext(context),
187 );
188 } else {
189 debug!(
190 target: LOGGING_TARGET,
191 "service={} name={} {}: {operation} {message} {err}, root={root:?}",
192 info.scheme(),
193 info.name(),
194 LoggingContext(context),
195 );
196 };
197 }
198
199 debug!(
200 target: LOGGING_TARGET,
201 "service={} name={} {}: {operation} {message}",
202 info.scheme(),
203 info.name(),
204 LoggingContext(context),
205 );
206 }
207}
208
209pub(crate) fn build_http_client(config: &HttpClientConfig) -> error::Result<HttpClient> {
210 if config.skip_ssl_validation {
211 common_telemetry::warn!(
212 "Skipping SSL validation for object storage HTTP client. Please ensure the environment is trusted."
213 );
214 }
215
216 let client = reqwest::ClientBuilder::new()
217 .pool_max_idle_per_host(config.pool_max_idle_per_host as usize)
218 .connect_timeout(config.connect_timeout)
219 .pool_idle_timeout(config.pool_idle_timeout)
220 .timeout(config.timeout)
221 .danger_accept_invalid_certs(config.skip_ssl_validation)
222 .build()
223 .context(error::BuildHttpClientSnafu)?;
224 Ok(HttpClient::with(client))
225}
226
227pub fn clean_temp_dir(dir: &str) -> error::Result<()> {
228 if path::Path::new(&dir).exists() {
229 info!("Begin to clean temp storage directory: {}", dir);
230 std::fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?;
231 info!("Cleaned temp storage directory: {}", dir);
232 }
233
234 Ok(())
235}
236
237pub struct PrintDetailedError;
239
240impl RetryInterceptor for PrintDetailedError {
242 fn intercept(&self, err: &Error, dur: Duration) {
243 warn!("Retry after {}s, error: {:#?}", dur.as_secs_f64(), err);
244 }
245}
246
247#[cfg(test)]
248mod tests {
249 use super::*;
250
251 #[test]
252 fn test_normalize_dir() {
253 assert_eq!("/", normalize_dir("/"));
254 assert_eq!("/", normalize_dir(""));
255 assert_eq!("/test/", normalize_dir("/test"));
256 }
257
258 #[test]
259 fn test_join_dir() {
260 assert_eq!("/", join_dir("", ""));
261 assert_eq!("/", join_dir("/", ""));
262 assert_eq!("/", join_dir("", "/"));
263 assert_eq!("/", join_dir("/", "/"));
264 assert_eq!("/a/", join_dir("/a", ""));
265 assert_eq!("a/b/c/", join_dir("a/b", "c"));
266 assert_eq!("/a/b/c/", join_dir("/a/b", "c"));
267 assert_eq!("/a/b/c/", join_dir("/a/b", "c/"));
268 assert_eq!("/a/b/c/", join_dir("/a/b", "/c/"));
269 assert_eq!("/a/b/c/", join_dir("/a/b", "//c"));
270 }
271
272 #[test]
273 fn test_join_path() {
274 assert_eq!("/", join_path("", ""));
275 assert_eq!("/", join_path("/", ""));
276 assert_eq!("/", join_path("", "/"));
277 assert_eq!("/", join_path("/", "/"));
278 assert_eq!("a/", join_path("a", ""));
279 assert_eq!("/a", join_path("/", "a"));
280 assert_eq!("a/b/c.txt", join_path("a/b", "c.txt"));
281 assert_eq!("/a/b/c.txt", join_path("/a/b", "c.txt"));
282 assert_eq!("/a/b/c/", join_path("/a/b", "c/"));
283 assert_eq!("/a/b/c/", join_path("/a/b", "/c/"));
284 assert_eq!("/a/b/c.txt", join_path("/a/b", "//c.txt"));
285 assert_eq!("abc/def", join_path(" abc", "/def "));
286 assert_eq!("/abc", join_path("//", "/abc"));
287 assert_eq!("abc/def", join_path("abc/", "//def"));
288 }
289}