object_store/
util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16use std::path;
17use std::time::Duration;
18
19use common_error::root_source;
20use common_telemetry::{debug, error, info, warn};
21use opendal::layers::{
22    LoggingInterceptor, LoggingLayer, RetryInterceptor, RetryLayer, TracingLayer,
23};
24use opendal::raw::{AccessorInfo, HttpClient, Operation};
25use opendal::{Error, ErrorKind};
26use snafu::ResultExt;
27
28use crate::config::HttpClientConfig;
29use crate::{ObjectStore, error};
30
31/// Join two paths and normalize the output dir.
32///
33/// The output dir is always ends with `/`. e.g.
34/// - `/a/b` join `c` => `/a/b/c/`
35/// - `/a/b` join `/c/` => `/a/b/c/`
36///
37/// All internal `//` will be replaced by `/`.
38pub fn join_dir(parent: &str, child: &str) -> String {
39    // Always adds a `/` to the output path.
40    let output = format!("{parent}/{child}/");
41    normalize_dir(&output)
42}
43
44/// Modified from the `opendal::raw::normalize_root`
45///
46/// # The different
47///
48/// It doesn't always append `/` ahead of the path,
49/// It only keeps `/` ahead if the original path starts with `/`.
50///
51/// Make sure the directory is normalized to style like `abc/def/`.
52///
53/// # Normalize Rules
54///
55/// - All whitespace will be trimmed: ` abc/def ` => `abc/def`
56/// - All leading / will be trimmed: `///abc` => `abc`
57/// - Internal // will be replaced by /: `abc///def` => `abc/def`
58/// - Empty path will be `/`: `` => `/`
59/// - **(Removed❗️)** ~~Add leading `/` if not starts with: `abc/` => `/abc/`~~
60/// - Add trailing `/` if not ends with: `/abc` => `/abc/`
61///
62/// Finally, we will got path like `/path/to/root/`.
63pub fn normalize_dir(v: &str) -> String {
64    let has_root = v.starts_with('/');
65    let mut v = v
66        .split('/')
67        .filter(|v| !v.is_empty())
68        .collect::<Vec<&str>>()
69        .join("/");
70    if has_root {
71        v.insert(0, '/');
72    }
73    if !v.ends_with('/') {
74        v.push('/')
75    }
76    v
77}
78
79/// Push `child` to `parent` dir and normalize the output path.
80///
81/// - Path endswith `/` means it's a dir path.
82/// - Otherwise, it's a file path.
83pub fn join_path(parent: &str, child: &str) -> String {
84    let output = format!("{parent}/{child}");
85    normalize_path(&output)
86}
87
88/// Make sure all operation are constructed by normalized path:
89///
90/// - Path endswith `/` means it's a dir path.
91/// - Otherwise, it's a file path.
92///
93/// # Normalize Rules
94///
95/// - All whitespace will be trimmed: ` abc/def ` => `abc/def`
96/// - Repeated leading / will be trimmed: `///abc` => `/abc`
97/// - Internal // will be replaced by /: `abc///def` => `abc/def`
98/// - Empty path will be `/`: `` => `/`
99pub fn normalize_path(path: &str) -> String {
100    // - all whitespace has been trimmed.
101    let path = path.trim();
102
103    // Fast line for empty path.
104    if path.is_empty() {
105        return "/".to_string();
106    }
107
108    let has_leading = path.starts_with('/');
109    let has_trailing = path.ends_with('/');
110
111    let mut p = path
112        .split('/')
113        .filter(|v| !v.is_empty())
114        .collect::<Vec<_>>()
115        .join("/");
116
117    // If path is not starting with `/` but it should
118    if !p.starts_with('/') && has_leading {
119        p.insert(0, '/');
120    }
121
122    // If path is not ending with `/` but it should
123    if !p.ends_with('/') && has_trailing {
124        p.push('/');
125    }
126
127    p
128}
129
130/// Attaches instrument layers to the object store.
131pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore {
132    object_store
133        .layer(LoggingLayer::new(DefaultLoggingInterceptor))
134        .layer(TracingLayer)
135        .layer(crate::layers::build_prometheus_metrics_layer(path_label))
136}
137
138/// Adds retry layer to the object store.
139pub fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
140    object_store.layer(
141        RetryLayer::new()
142            .with_jitter()
143            .with_notify(PrintDetailedError),
144    )
145}
146
147static LOGGING_TARGET: &str = "opendal::services";
148
149struct LoggingContext<'a>(&'a [(&'a str, &'a str)]);
150
151impl Display for LoggingContext<'_> {
152    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153        for (i, (k, v)) in self.0.iter().enumerate() {
154            if i > 0 {
155                write!(f, " {}={}", k, v)?;
156            } else {
157                write!(f, "{}={}", k, v)?;
158            }
159        }
160        Ok(())
161    }
162}
163
164#[derive(Debug, Copy, Clone, Default)]
165pub struct DefaultLoggingInterceptor;
166
167impl LoggingInterceptor for DefaultLoggingInterceptor {
168    #[inline]
169    fn log(
170        &self,
171        info: &AccessorInfo,
172        operation: Operation,
173        context: &[(&str, &str)],
174        message: &str,
175        err: Option<&opendal::Error>,
176    ) {
177        if let Some(err) = err {
178            let root = root_source(err);
179            // Print error if it's unexpected, otherwise in error.
180            if err.kind() == ErrorKind::Unexpected {
181                error!(
182                    target: LOGGING_TARGET,
183                    "service={} name={} {}: {operation} {message} {err:#?}, root={root:#?}",
184                    info.scheme(),
185                    info.name(),
186                    LoggingContext(context),
187                );
188            } else {
189                debug!(
190                    target: LOGGING_TARGET,
191                    "service={} name={} {}: {operation} {message} {err}, root={root:?}",
192                    info.scheme(),
193                    info.name(),
194                    LoggingContext(context),
195                );
196            };
197        }
198
199        debug!(
200            target: LOGGING_TARGET,
201            "service={} name={} {}: {operation} {message}",
202            info.scheme(),
203            info.name(),
204            LoggingContext(context),
205        );
206    }
207}
208
209pub(crate) fn build_http_client(config: &HttpClientConfig) -> error::Result<HttpClient> {
210    if config.skip_ssl_validation {
211        common_telemetry::warn!(
212            "Skipping SSL validation for object storage HTTP client. Please ensure the environment is trusted."
213        );
214    }
215
216    let client = reqwest::ClientBuilder::new()
217        .pool_max_idle_per_host(config.pool_max_idle_per_host as usize)
218        .connect_timeout(config.connect_timeout)
219        .pool_idle_timeout(config.pool_idle_timeout)
220        .timeout(config.timeout)
221        .danger_accept_invalid_certs(config.skip_ssl_validation)
222        .build()
223        .context(error::BuildHttpClientSnafu)?;
224    Ok(HttpClient::with(client))
225}
226
227pub fn clean_temp_dir(dir: &str) -> error::Result<()> {
228    if path::Path::new(&dir).exists() {
229        info!("Begin to clean temp storage directory: {}", dir);
230        std::fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?;
231        info!("Cleaned temp storage directory: {}", dir);
232    }
233
234    Ok(())
235}
236
237/// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying.
238pub struct PrintDetailedError;
239
240// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying.
241impl RetryInterceptor for PrintDetailedError {
242    fn intercept(&self, err: &Error, dur: Duration) {
243        warn!("Retry after {}s, error: {:#?}", dur.as_secs_f64(), err);
244    }
245}
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250
251    #[test]
252    fn test_normalize_dir() {
253        assert_eq!("/", normalize_dir("/"));
254        assert_eq!("/", normalize_dir(""));
255        assert_eq!("/test/", normalize_dir("/test"));
256    }
257
258    #[test]
259    fn test_join_dir() {
260        assert_eq!("/", join_dir("", ""));
261        assert_eq!("/", join_dir("/", ""));
262        assert_eq!("/", join_dir("", "/"));
263        assert_eq!("/", join_dir("/", "/"));
264        assert_eq!("/a/", join_dir("/a", ""));
265        assert_eq!("a/b/c/", join_dir("a/b", "c"));
266        assert_eq!("/a/b/c/", join_dir("/a/b", "c"));
267        assert_eq!("/a/b/c/", join_dir("/a/b", "c/"));
268        assert_eq!("/a/b/c/", join_dir("/a/b", "/c/"));
269        assert_eq!("/a/b/c/", join_dir("/a/b", "//c"));
270    }
271
272    #[test]
273    fn test_join_path() {
274        assert_eq!("/", join_path("", ""));
275        assert_eq!("/", join_path("/", ""));
276        assert_eq!("/", join_path("", "/"));
277        assert_eq!("/", join_path("/", "/"));
278        assert_eq!("a/", join_path("a", ""));
279        assert_eq!("/a", join_path("/", "a"));
280        assert_eq!("a/b/c.txt", join_path("a/b", "c.txt"));
281        assert_eq!("/a/b/c.txt", join_path("/a/b", "c.txt"));
282        assert_eq!("/a/b/c/", join_path("/a/b", "c/"));
283        assert_eq!("/a/b/c/", join_path("/a/b", "/c/"));
284        assert_eq!("/a/b/c.txt", join_path("/a/b", "//c.txt"));
285        assert_eq!("abc/def", join_path(" abc", "/def "));
286        assert_eq!("/abc", join_path("//", "/abc"));
287        assert_eq!("abc/def", join_path("abc/", "//def"));
288    }
289}