object_store/
util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16use std::path;
17use std::time::Duration;
18
19use common_telemetry::{debug, error, info, trace, warn};
20use opendal::layers::{LoggingInterceptor, LoggingLayer, RetryInterceptor, TracingLayer};
21use opendal::raw::{AccessorInfo, HttpClient, Operation};
22use opendal::{Error, ErrorKind};
23use snafu::ResultExt;
24
25use crate::config::HttpClientConfig;
26use crate::{error, ObjectStore};
27
28/// Join two paths and normalize the output dir.
29///
30/// The output dir is always ends with `/`. e.g.
31/// - `/a/b` join `c` => `/a/b/c/`
32/// - `/a/b` join `/c/` => `/a/b/c/`
33///
34/// All internal `//` will be replaced by `/`.
35pub fn join_dir(parent: &str, child: &str) -> String {
36    // Always adds a `/` to the output path.
37    let output = format!("{parent}/{child}/");
38    normalize_dir(&output)
39}
40
41/// Modified from the `opendal::raw::normalize_root`
42///
43/// # The different
44///
45/// It doesn't always append `/` ahead of the path,
46/// It only keeps `/` ahead if the original path starts with `/`.
47///
48/// Make sure the directory is normalized to style like `abc/def/`.
49///
50/// # Normalize Rules
51///
52/// - All whitespace will be trimmed: ` abc/def ` => `abc/def`
53/// - All leading / will be trimmed: `///abc` => `abc`
54/// - Internal // will be replaced by /: `abc///def` => `abc/def`
55/// - Empty path will be `/`: `` => `/`
56/// - **(Removed❗️)** ~~Add leading `/` if not starts with: `abc/` => `/abc/`~~
57/// - Add trailing `/` if not ends with: `/abc` => `/abc/`
58///
59/// Finally, we will got path like `/path/to/root/`.
60pub fn normalize_dir(v: &str) -> String {
61    let has_root = v.starts_with('/');
62    let mut v = v
63        .split('/')
64        .filter(|v| !v.is_empty())
65        .collect::<Vec<&str>>()
66        .join("/");
67    if has_root {
68        v.insert(0, '/');
69    }
70    if !v.ends_with('/') {
71        v.push('/')
72    }
73    v
74}
75
76/// Push `child` to `parent` dir and normalize the output path.
77///
78/// - Path endswith `/` means it's a dir path.
79/// - Otherwise, it's a file path.
80pub fn join_path(parent: &str, child: &str) -> String {
81    let output = format!("{parent}/{child}");
82    normalize_path(&output)
83}
84
85/// Make sure all operation are constructed by normalized path:
86///
87/// - Path endswith `/` means it's a dir path.
88/// - Otherwise, it's a file path.
89///
90/// # Normalize Rules
91///
92/// - All whitespace will be trimmed: ` abc/def ` => `abc/def`
93/// - Repeated leading / will be trimmed: `///abc` => `/abc`
94/// - Internal // will be replaced by /: `abc///def` => `abc/def`
95/// - Empty path will be `/`: `` => `/`
96pub fn normalize_path(path: &str) -> String {
97    // - all whitespace has been trimmed.
98    let path = path.trim();
99
100    // Fast line for empty path.
101    if path.is_empty() {
102        return "/".to_string();
103    }
104
105    let has_leading = path.starts_with('/');
106    let has_trailing = path.ends_with('/');
107
108    let mut p = path
109        .split('/')
110        .filter(|v| !v.is_empty())
111        .collect::<Vec<_>>()
112        .join("/");
113
114    // If path is not starting with `/` but it should
115    if !p.starts_with('/') && has_leading {
116        p.insert(0, '/');
117    }
118
119    // If path is not ending with `/` but it should
120    if !p.ends_with('/') && has_trailing {
121        p.push('/');
122    }
123
124    p
125}
126
127/// Attaches instrument layers to the object store.
128pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore {
129    object_store
130        .layer(LoggingLayer::new(DefaultLoggingInterceptor))
131        .layer(TracingLayer)
132        .layer(crate::layers::build_prometheus_metrics_layer(path_label))
133}
134
135static LOGGING_TARGET: &str = "opendal::services";
136
137struct LoggingContext<'a>(&'a [(&'a str, &'a str)]);
138
139impl Display for LoggingContext<'_> {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        for (i, (k, v)) in self.0.iter().enumerate() {
142            if i > 0 {
143                write!(f, " {}={}", k, v)?;
144            } else {
145                write!(f, "{}={}", k, v)?;
146            }
147        }
148        Ok(())
149    }
150}
151
152#[derive(Debug, Copy, Clone, Default)]
153pub struct DefaultLoggingInterceptor;
154
155impl LoggingInterceptor for DefaultLoggingInterceptor {
156    #[inline]
157    fn log(
158        &self,
159        info: &AccessorInfo,
160        operation: Operation,
161        context: &[(&str, &str)],
162        message: &str,
163        err: Option<&opendal::Error>,
164    ) {
165        if let Some(err) = err {
166            // Print error if it's unexpected, otherwise in error.
167            if err.kind() == ErrorKind::Unexpected {
168                error!(
169                    target: LOGGING_TARGET,
170                    "service={} name={} {}: {operation} {message} {err:#?}",
171                    info.scheme(),
172                    info.name(),
173                    LoggingContext(context),
174                );
175            } else {
176                debug!(
177                    target: LOGGING_TARGET,
178                    "service={} name={} {}: {operation} {message} {err}",
179                    info.scheme(),
180                    info.name(),
181                    LoggingContext(context),
182                );
183            };
184        }
185
186        // Print debug message if operation is oneshot, otherwise in trace.
187        if operation.is_oneshot() {
188            debug!(
189                target: LOGGING_TARGET,
190                "service={} name={} {}: {operation} {message}",
191                info.scheme(),
192                info.name(),
193                LoggingContext(context),
194            );
195        } else {
196            trace!(
197                target: LOGGING_TARGET,
198                "service={} name={} {}: {operation} {message}",
199                info.scheme(),
200                info.name(),
201                LoggingContext(context),
202            );
203        };
204    }
205}
206
207pub(crate) fn build_http_client(config: &HttpClientConfig) -> error::Result<HttpClient> {
208    if config.skip_ssl_validation {
209        common_telemetry::warn!("Skipping SSL validation for object storage HTTP client. Please ensure the environment is trusted.");
210    }
211
212    let client = reqwest::ClientBuilder::new()
213        .pool_max_idle_per_host(config.pool_max_idle_per_host as usize)
214        .connect_timeout(config.connect_timeout)
215        .pool_idle_timeout(config.pool_idle_timeout)
216        .timeout(config.timeout)
217        .danger_accept_invalid_certs(config.skip_ssl_validation)
218        .build()
219        .context(error::BuildHttpClientSnafu)?;
220    Ok(HttpClient::with(client))
221}
222
223pub fn clean_temp_dir(dir: &str) -> error::Result<()> {
224    if path::Path::new(&dir).exists() {
225        info!("Begin to clean temp storage directory: {}", dir);
226        std::fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?;
227        info!("Cleaned temp storage directory: {}", dir);
228    }
229
230    Ok(())
231}
232
233/// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying.
234pub struct PrintDetailedError;
235
236// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying.
237impl RetryInterceptor for PrintDetailedError {
238    fn intercept(&self, err: &Error, dur: Duration) {
239        warn!("Retry after {}s, error: {:#?}", dur.as_secs_f64(), err);
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246
247    #[test]
248    fn test_normalize_dir() {
249        assert_eq!("/", normalize_dir("/"));
250        assert_eq!("/", normalize_dir(""));
251        assert_eq!("/test/", normalize_dir("/test"));
252    }
253
254    #[test]
255    fn test_join_dir() {
256        assert_eq!("/", join_dir("", ""));
257        assert_eq!("/", join_dir("/", ""));
258        assert_eq!("/", join_dir("", "/"));
259        assert_eq!("/", join_dir("/", "/"));
260        assert_eq!("/a/", join_dir("/a", ""));
261        assert_eq!("a/b/c/", join_dir("a/b", "c"));
262        assert_eq!("/a/b/c/", join_dir("/a/b", "c"));
263        assert_eq!("/a/b/c/", join_dir("/a/b", "c/"));
264        assert_eq!("/a/b/c/", join_dir("/a/b", "/c/"));
265        assert_eq!("/a/b/c/", join_dir("/a/b", "//c"));
266    }
267
268    #[test]
269    fn test_join_path() {
270        assert_eq!("/", join_path("", ""));
271        assert_eq!("/", join_path("/", ""));
272        assert_eq!("/", join_path("", "/"));
273        assert_eq!("/", join_path("/", "/"));
274        assert_eq!("a/", join_path("a", ""));
275        assert_eq!("/a", join_path("/", "a"));
276        assert_eq!("a/b/c.txt", join_path("a/b", "c.txt"));
277        assert_eq!("/a/b/c.txt", join_path("/a/b", "c.txt"));
278        assert_eq!("/a/b/c/", join_path("/a/b", "c/"));
279        assert_eq!("/a/b/c/", join_path("/a/b", "/c/"));
280        assert_eq!("/a/b/c.txt", join_path("/a/b", "//c.txt"));
281        assert_eq!("abc/def", join_path(" abc", "/def "));
282        assert_eq!("/abc", join_path("//", "/abc"));
283        assert_eq!("abc/def", join_path("abc/", "//def"));
284    }
285}