1use std::fmt::Display;
16use std::path;
17use std::time::Duration;
18
19use common_telemetry::{debug, error, info, trace, warn};
20use opendal::layers::{LoggingInterceptor, LoggingLayer, RetryInterceptor, TracingLayer};
21use opendal::raw::{AccessorInfo, HttpClient, Operation};
22use opendal::{Error, ErrorKind};
23use snafu::ResultExt;
24
25use crate::config::HttpClientConfig;
26use crate::{error, ObjectStore};
27
28pub fn join_dir(parent: &str, child: &str) -> String {
36 let output = format!("{parent}/{child}/");
38 normalize_dir(&output)
39}
40
41pub fn normalize_dir(v: &str) -> String {
61 let has_root = v.starts_with('/');
62 let mut v = v
63 .split('/')
64 .filter(|v| !v.is_empty())
65 .collect::<Vec<&str>>()
66 .join("/");
67 if has_root {
68 v.insert(0, '/');
69 }
70 if !v.ends_with('/') {
71 v.push('/')
72 }
73 v
74}
75
76pub fn join_path(parent: &str, child: &str) -> String {
81 let output = format!("{parent}/{child}");
82 normalize_path(&output)
83}
84
85pub fn normalize_path(path: &str) -> String {
97 let path = path.trim();
99
100 if path.is_empty() {
102 return "/".to_string();
103 }
104
105 let has_leading = path.starts_with('/');
106 let has_trailing = path.ends_with('/');
107
108 let mut p = path
109 .split('/')
110 .filter(|v| !v.is_empty())
111 .collect::<Vec<_>>()
112 .join("/");
113
114 if !p.starts_with('/') && has_leading {
116 p.insert(0, '/');
117 }
118
119 if !p.ends_with('/') && has_trailing {
121 p.push('/');
122 }
123
124 p
125}
126
127pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore {
129 object_store
130 .layer(LoggingLayer::new(DefaultLoggingInterceptor))
131 .layer(TracingLayer)
132 .layer(crate::layers::build_prometheus_metrics_layer(path_label))
133}
134
135static LOGGING_TARGET: &str = "opendal::services";
136
137struct LoggingContext<'a>(&'a [(&'a str, &'a str)]);
138
139impl Display for LoggingContext<'_> {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 for (i, (k, v)) in self.0.iter().enumerate() {
142 if i > 0 {
143 write!(f, " {}={}", k, v)?;
144 } else {
145 write!(f, "{}={}", k, v)?;
146 }
147 }
148 Ok(())
149 }
150}
151
152#[derive(Debug, Copy, Clone, Default)]
153pub struct DefaultLoggingInterceptor;
154
155impl LoggingInterceptor for DefaultLoggingInterceptor {
156 #[inline]
157 fn log(
158 &self,
159 info: &AccessorInfo,
160 operation: Operation,
161 context: &[(&str, &str)],
162 message: &str,
163 err: Option<&opendal::Error>,
164 ) {
165 if let Some(err) = err {
166 if err.kind() == ErrorKind::Unexpected {
168 error!(
169 target: LOGGING_TARGET,
170 "service={} name={} {}: {operation} {message} {err:#?}",
171 info.scheme(),
172 info.name(),
173 LoggingContext(context),
174 );
175 } else {
176 debug!(
177 target: LOGGING_TARGET,
178 "service={} name={} {}: {operation} {message} {err}",
179 info.scheme(),
180 info.name(),
181 LoggingContext(context),
182 );
183 };
184 }
185
186 if operation.is_oneshot() {
188 debug!(
189 target: LOGGING_TARGET,
190 "service={} name={} {}: {operation} {message}",
191 info.scheme(),
192 info.name(),
193 LoggingContext(context),
194 );
195 } else {
196 trace!(
197 target: LOGGING_TARGET,
198 "service={} name={} {}: {operation} {message}",
199 info.scheme(),
200 info.name(),
201 LoggingContext(context),
202 );
203 };
204 }
205}
206
207pub(crate) fn build_http_client(config: &HttpClientConfig) -> error::Result<HttpClient> {
208 if config.skip_ssl_validation {
209 common_telemetry::warn!("Skipping SSL validation for object storage HTTP client. Please ensure the environment is trusted.");
210 }
211
212 let client = reqwest::ClientBuilder::new()
213 .pool_max_idle_per_host(config.pool_max_idle_per_host as usize)
214 .connect_timeout(config.connect_timeout)
215 .pool_idle_timeout(config.pool_idle_timeout)
216 .timeout(config.timeout)
217 .danger_accept_invalid_certs(config.skip_ssl_validation)
218 .build()
219 .context(error::BuildHttpClientSnafu)?;
220 Ok(HttpClient::with(client))
221}
222
223pub fn clean_temp_dir(dir: &str) -> error::Result<()> {
224 if path::Path::new(&dir).exists() {
225 info!("Begin to clean temp storage directory: {}", dir);
226 std::fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?;
227 info!("Cleaned temp storage directory: {}", dir);
228 }
229
230 Ok(())
231}
232
233pub struct PrintDetailedError;
235
236impl RetryInterceptor for PrintDetailedError {
238 fn intercept(&self, err: &Error, dur: Duration) {
239 warn!("Retry after {}s, error: {:#?}", dur.as_secs_f64(), err);
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246
247 #[test]
248 fn test_normalize_dir() {
249 assert_eq!("/", normalize_dir("/"));
250 assert_eq!("/", normalize_dir(""));
251 assert_eq!("/test/", normalize_dir("/test"));
252 }
253
254 #[test]
255 fn test_join_dir() {
256 assert_eq!("/", join_dir("", ""));
257 assert_eq!("/", join_dir("/", ""));
258 assert_eq!("/", join_dir("", "/"));
259 assert_eq!("/", join_dir("/", "/"));
260 assert_eq!("/a/", join_dir("/a", ""));
261 assert_eq!("a/b/c/", join_dir("a/b", "c"));
262 assert_eq!("/a/b/c/", join_dir("/a/b", "c"));
263 assert_eq!("/a/b/c/", join_dir("/a/b", "c/"));
264 assert_eq!("/a/b/c/", join_dir("/a/b", "/c/"));
265 assert_eq!("/a/b/c/", join_dir("/a/b", "//c"));
266 }
267
268 #[test]
269 fn test_join_path() {
270 assert_eq!("/", join_path("", ""));
271 assert_eq!("/", join_path("/", ""));
272 assert_eq!("/", join_path("", "/"));
273 assert_eq!("/", join_path("/", "/"));
274 assert_eq!("a/", join_path("a", ""));
275 assert_eq!("/a", join_path("/", "a"));
276 assert_eq!("a/b/c.txt", join_path("a/b", "c.txt"));
277 assert_eq!("/a/b/c.txt", join_path("/a/b", "c.txt"));
278 assert_eq!("/a/b/c/", join_path("/a/b", "c/"));
279 assert_eq!("/a/b/c/", join_path("/a/b", "/c/"));
280 assert_eq!("/a/b/c.txt", join_path("/a/b", "//c.txt"));
281 assert_eq!("abc/def", join_path(" abc", "/def "));
282 assert_eq!("/abc", join_path("//", "/abc"));
283 assert_eq!("abc/def", join_path("abc/", "//def"));
284 }
285}