object_store/
util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16use std::path;
17use std::time::Duration;
18
19use common_telemetry::{debug, error, info, warn};
20use opendal::layers::{LoggingInterceptor, LoggingLayer, RetryInterceptor, TracingLayer};
21use opendal::raw::{AccessorInfo, HttpClient, Operation};
22use opendal::{Error, ErrorKind};
23use snafu::ResultExt;
24
25use crate::config::HttpClientConfig;
26use crate::{error, ObjectStore};
27
28/// Join two paths and normalize the output dir.
29///
30/// The output dir is always ends with `/`. e.g.
31/// - `/a/b` join `c` => `/a/b/c/`
32/// - `/a/b` join `/c/` => `/a/b/c/`
33///
34/// All internal `//` will be replaced by `/`.
35pub fn join_dir(parent: &str, child: &str) -> String {
36    // Always adds a `/` to the output path.
37    let output = format!("{parent}/{child}/");
38    normalize_dir(&output)
39}
40
41/// Modified from the `opendal::raw::normalize_root`
42///
43/// # The different
44///
45/// It doesn't always append `/` ahead of the path,
46/// It only keeps `/` ahead if the original path starts with `/`.
47///
48/// Make sure the directory is normalized to style like `abc/def/`.
49///
50/// # Normalize Rules
51///
52/// - All whitespace will be trimmed: ` abc/def ` => `abc/def`
53/// - All leading / will be trimmed: `///abc` => `abc`
54/// - Internal // will be replaced by /: `abc///def` => `abc/def`
55/// - Empty path will be `/`: `` => `/`
56/// - **(Removed❗️)** ~~Add leading `/` if not starts with: `abc/` => `/abc/`~~
57/// - Add trailing `/` if not ends with: `/abc` => `/abc/`
58///
59/// Finally, we will got path like `/path/to/root/`.
60pub fn normalize_dir(v: &str) -> String {
61    let has_root = v.starts_with('/');
62    let mut v = v
63        .split('/')
64        .filter(|v| !v.is_empty())
65        .collect::<Vec<&str>>()
66        .join("/");
67    if has_root {
68        v.insert(0, '/');
69    }
70    if !v.ends_with('/') {
71        v.push('/')
72    }
73    v
74}
75
76/// Push `child` to `parent` dir and normalize the output path.
77///
78/// - Path endswith `/` means it's a dir path.
79/// - Otherwise, it's a file path.
80pub fn join_path(parent: &str, child: &str) -> String {
81    let output = format!("{parent}/{child}");
82    normalize_path(&output)
83}
84
85/// Make sure all operation are constructed by normalized path:
86///
87/// - Path endswith `/` means it's a dir path.
88/// - Otherwise, it's a file path.
89///
90/// # Normalize Rules
91///
92/// - All whitespace will be trimmed: ` abc/def ` => `abc/def`
93/// - Repeated leading / will be trimmed: `///abc` => `/abc`
94/// - Internal // will be replaced by /: `abc///def` => `abc/def`
95/// - Empty path will be `/`: `` => `/`
96pub fn normalize_path(path: &str) -> String {
97    // - all whitespace has been trimmed.
98    let path = path.trim();
99
100    // Fast line for empty path.
101    if path.is_empty() {
102        return "/".to_string();
103    }
104
105    let has_leading = path.starts_with('/');
106    let has_trailing = path.ends_with('/');
107
108    let mut p = path
109        .split('/')
110        .filter(|v| !v.is_empty())
111        .collect::<Vec<_>>()
112        .join("/");
113
114    // If path is not starting with `/` but it should
115    if !p.starts_with('/') && has_leading {
116        p.insert(0, '/');
117    }
118
119    // If path is not ending with `/` but it should
120    if !p.ends_with('/') && has_trailing {
121        p.push('/');
122    }
123
124    p
125}
126
127/// Attaches instrument layers to the object store.
128pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore {
129    object_store
130        .layer(LoggingLayer::new(DefaultLoggingInterceptor))
131        .layer(TracingLayer)
132        .layer(crate::layers::build_prometheus_metrics_layer(path_label))
133}
134
135static LOGGING_TARGET: &str = "opendal::services";
136
137struct LoggingContext<'a>(&'a [(&'a str, &'a str)]);
138
139impl Display for LoggingContext<'_> {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        for (i, (k, v)) in self.0.iter().enumerate() {
142            if i > 0 {
143                write!(f, " {}={}", k, v)?;
144            } else {
145                write!(f, "{}={}", k, v)?;
146            }
147        }
148        Ok(())
149    }
150}
151
152#[derive(Debug, Copy, Clone, Default)]
153pub struct DefaultLoggingInterceptor;
154
155impl LoggingInterceptor for DefaultLoggingInterceptor {
156    #[inline]
157    fn log(
158        &self,
159        info: &AccessorInfo,
160        operation: Operation,
161        context: &[(&str, &str)],
162        message: &str,
163        err: Option<&opendal::Error>,
164    ) {
165        if let Some(err) = err {
166            // Print error if it's unexpected, otherwise in error.
167            if err.kind() == ErrorKind::Unexpected {
168                error!(
169                    target: LOGGING_TARGET,
170                    "service={} name={} {}: {operation} {message} {err:#?}",
171                    info.scheme(),
172                    info.name(),
173                    LoggingContext(context),
174                );
175            } else {
176                debug!(
177                    target: LOGGING_TARGET,
178                    "service={} name={} {}: {operation} {message} {err}",
179                    info.scheme(),
180                    info.name(),
181                    LoggingContext(context),
182                );
183            };
184        }
185
186        debug!(
187            target: LOGGING_TARGET,
188            "service={} name={} {}: {operation} {message}",
189            info.scheme(),
190            info.name(),
191            LoggingContext(context),
192        );
193    }
194}
195
196pub(crate) fn build_http_client(config: &HttpClientConfig) -> error::Result<HttpClient> {
197    if config.skip_ssl_validation {
198        common_telemetry::warn!("Skipping SSL validation for object storage HTTP client. Please ensure the environment is trusted.");
199    }
200
201    let client = reqwest::ClientBuilder::new()
202        .pool_max_idle_per_host(config.pool_max_idle_per_host as usize)
203        .connect_timeout(config.connect_timeout)
204        .pool_idle_timeout(config.pool_idle_timeout)
205        .timeout(config.timeout)
206        .danger_accept_invalid_certs(config.skip_ssl_validation)
207        .build()
208        .context(error::BuildHttpClientSnafu)?;
209    Ok(HttpClient::with(client))
210}
211
212pub fn clean_temp_dir(dir: &str) -> error::Result<()> {
213    if path::Path::new(&dir).exists() {
214        info!("Begin to clean temp storage directory: {}", dir);
215        std::fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?;
216        info!("Cleaned temp storage directory: {}", dir);
217    }
218
219    Ok(())
220}
221
222/// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying.
223pub struct PrintDetailedError;
224
225// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying.
226impl RetryInterceptor for PrintDetailedError {
227    fn intercept(&self, err: &Error, dur: Duration) {
228        warn!("Retry after {}s, error: {:#?}", dur.as_secs_f64(), err);
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    #[test]
237    fn test_normalize_dir() {
238        assert_eq!("/", normalize_dir("/"));
239        assert_eq!("/", normalize_dir(""));
240        assert_eq!("/test/", normalize_dir("/test"));
241    }
242
243    #[test]
244    fn test_join_dir() {
245        assert_eq!("/", join_dir("", ""));
246        assert_eq!("/", join_dir("/", ""));
247        assert_eq!("/", join_dir("", "/"));
248        assert_eq!("/", join_dir("/", "/"));
249        assert_eq!("/a/", join_dir("/a", ""));
250        assert_eq!("a/b/c/", join_dir("a/b", "c"));
251        assert_eq!("/a/b/c/", join_dir("/a/b", "c"));
252        assert_eq!("/a/b/c/", join_dir("/a/b", "c/"));
253        assert_eq!("/a/b/c/", join_dir("/a/b", "/c/"));
254        assert_eq!("/a/b/c/", join_dir("/a/b", "//c"));
255    }
256
257    #[test]
258    fn test_join_path() {
259        assert_eq!("/", join_path("", ""));
260        assert_eq!("/", join_path("/", ""));
261        assert_eq!("/", join_path("", "/"));
262        assert_eq!("/", join_path("/", "/"));
263        assert_eq!("a/", join_path("a", ""));
264        assert_eq!("/a", join_path("/", "a"));
265        assert_eq!("a/b/c.txt", join_path("a/b", "c.txt"));
266        assert_eq!("/a/b/c.txt", join_path("/a/b", "c.txt"));
267        assert_eq!("/a/b/c/", join_path("/a/b", "c/"));
268        assert_eq!("/a/b/c/", join_path("/a/b", "/c/"));
269        assert_eq!("/a/b/c.txt", join_path("/a/b", "//c.txt"));
270        assert_eq!("abc/def", join_path(" abc", "/def "));
271        assert_eq!("/abc", join_path("//", "/abc"));
272        assert_eq!("abc/def", join_path("abc/", "//def"));
273    }
274}