object_store/
util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16
17use common_telemetry::{debug, error, trace};
18use opendal::layers::{LoggingInterceptor, LoggingLayer, TracingLayer};
19use opendal::raw::{AccessorInfo, Operation};
20use opendal::ErrorKind;
21
22use crate::ObjectStore;
23
24/// Join two paths and normalize the output dir.
25///
26/// The output dir is always ends with `/`. e.g.
27/// - `/a/b` join `c` => `/a/b/c/`
28/// - `/a/b` join `/c/` => `/a/b/c/`
29///
30/// All internal `//` will be replaced by `/`.
31pub fn join_dir(parent: &str, child: &str) -> String {
32    // Always adds a `/` to the output path.
33    let output = format!("{parent}/{child}/");
34    normalize_dir(&output)
35}
36
37/// Modified from the `opendal::raw::normalize_root`
38///
39/// # The different
40///
41/// It doesn't always append `/` ahead of the path,
42/// It only keeps `/` ahead if the original path starts with `/`.
43///
44/// Make sure the directory is normalized to style like `abc/def/`.
45///
46/// # Normalize Rules
47///
48/// - All whitespace will be trimmed: ` abc/def ` => `abc/def`
49/// - All leading / will be trimmed: `///abc` => `abc`
50/// - Internal // will be replaced by /: `abc///def` => `abc/def`
51/// - Empty path will be `/`: `` => `/`
52/// - **(Removed❗️)** ~~Add leading `/` if not starts with: `abc/` => `/abc/`~~
53/// - Add trailing `/` if not ends with: `/abc` => `/abc/`
54///
55/// Finally, we will got path like `/path/to/root/`.
56pub fn normalize_dir(v: &str) -> String {
57    let has_root = v.starts_with('/');
58    let mut v = v
59        .split('/')
60        .filter(|v| !v.is_empty())
61        .collect::<Vec<&str>>()
62        .join("/");
63    if has_root {
64        v.insert(0, '/');
65    }
66    if !v.ends_with('/') {
67        v.push('/')
68    }
69    v
70}
71
72/// Push `child` to `parent` dir and normalize the output path.
73///
74/// - Path endswith `/` means it's a dir path.
75/// - Otherwise, it's a file path.
76pub fn join_path(parent: &str, child: &str) -> String {
77    let output = format!("{parent}/{child}");
78    normalize_path(&output)
79}
80
81/// Make sure all operation are constructed by normalized path:
82///
83/// - Path endswith `/` means it's a dir path.
84/// - Otherwise, it's a file path.
85///
86/// # Normalize Rules
87///
88/// - All whitespace will be trimmed: ` abc/def ` => `abc/def`
89/// - Repeated leading / will be trimmed: `///abc` => `/abc`
90/// - Internal // will be replaced by /: `abc///def` => `abc/def`
91/// - Empty path will be `/`: `` => `/`
92pub fn normalize_path(path: &str) -> String {
93    // - all whitespace has been trimmed.
94    let path = path.trim();
95
96    // Fast line for empty path.
97    if path.is_empty() {
98        return "/".to_string();
99    }
100
101    let has_leading = path.starts_with('/');
102    let has_trailing = path.ends_with('/');
103
104    let mut p = path
105        .split('/')
106        .filter(|v| !v.is_empty())
107        .collect::<Vec<_>>()
108        .join("/");
109
110    // If path is not starting with `/` but it should
111    if !p.starts_with('/') && has_leading {
112        p.insert(0, '/');
113    }
114
115    // If path is not ending with `/` but it should
116    if !p.ends_with('/') && has_trailing {
117        p.push('/');
118    }
119
120    p
121}
122
123/// Attaches instrument layers to the object store.
124pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore {
125    object_store
126        .layer(LoggingLayer::new(DefaultLoggingInterceptor))
127        .layer(TracingLayer)
128        .layer(crate::layers::build_prometheus_metrics_layer(path_label))
129}
130
131static LOGGING_TARGET: &str = "opendal::services";
132
133struct LoggingContext<'a>(&'a [(&'a str, &'a str)]);
134
135impl Display for LoggingContext<'_> {
136    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137        for (i, (k, v)) in self.0.iter().enumerate() {
138            if i > 0 {
139                write!(f, " {}={}", k, v)?;
140            } else {
141                write!(f, "{}={}", k, v)?;
142            }
143        }
144        Ok(())
145    }
146}
147
148#[derive(Debug, Copy, Clone, Default)]
149pub struct DefaultLoggingInterceptor;
150
151impl LoggingInterceptor for DefaultLoggingInterceptor {
152    #[inline]
153    fn log(
154        &self,
155        info: &AccessorInfo,
156        operation: Operation,
157        context: &[(&str, &str)],
158        message: &str,
159        err: Option<&opendal::Error>,
160    ) {
161        if let Some(err) = err {
162            // Print error if it's unexpected, otherwise in error.
163            if err.kind() == ErrorKind::Unexpected {
164                error!(
165                    target: LOGGING_TARGET,
166                    "service={} name={} {}: {operation} {message} {err:#?}",
167                    info.scheme(),
168                    info.name(),
169                    LoggingContext(context),
170                );
171            } else {
172                debug!(
173                    target: LOGGING_TARGET,
174                    "service={} name={} {}: {operation} {message} {err}",
175                    info.scheme(),
176                    info.name(),
177                    LoggingContext(context),
178                );
179            };
180        }
181
182        // Print debug message if operation is oneshot, otherwise in trace.
183        if operation.is_oneshot() {
184            debug!(
185                target: LOGGING_TARGET,
186                "service={} name={} {}: {operation} {message}",
187                info.scheme(),
188                info.name(),
189                LoggingContext(context),
190            );
191        } else {
192            trace!(
193                target: LOGGING_TARGET,
194                "service={} name={} {}: {operation} {message}",
195                info.scheme(),
196                info.name(),
197                LoggingContext(context),
198            );
199        };
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206
207    #[test]
208    fn test_normalize_dir() {
209        assert_eq!("/", normalize_dir("/"));
210        assert_eq!("/", normalize_dir(""));
211        assert_eq!("/test/", normalize_dir("/test"));
212    }
213
214    #[test]
215    fn test_join_dir() {
216        assert_eq!("/", join_dir("", ""));
217        assert_eq!("/", join_dir("/", ""));
218        assert_eq!("/", join_dir("", "/"));
219        assert_eq!("/", join_dir("/", "/"));
220        assert_eq!("/a/", join_dir("/a", ""));
221        assert_eq!("a/b/c/", join_dir("a/b", "c"));
222        assert_eq!("/a/b/c/", join_dir("/a/b", "c"));
223        assert_eq!("/a/b/c/", join_dir("/a/b", "c/"));
224        assert_eq!("/a/b/c/", join_dir("/a/b", "/c/"));
225        assert_eq!("/a/b/c/", join_dir("/a/b", "//c"));
226    }
227
228    #[test]
229    fn test_join_path() {
230        assert_eq!("/", join_path("", ""));
231        assert_eq!("/", join_path("/", ""));
232        assert_eq!("/", join_path("", "/"));
233        assert_eq!("/", join_path("/", "/"));
234        assert_eq!("a/", join_path("a", ""));
235        assert_eq!("/a", join_path("/", "a"));
236        assert_eq!("a/b/c.txt", join_path("a/b", "c.txt"));
237        assert_eq!("/a/b/c.txt", join_path("/a/b", "c.txt"));
238        assert_eq!("/a/b/c/", join_path("/a/b", "c/"));
239        assert_eq!("/a/b/c/", join_path("/a/b", "/c/"));
240        assert_eq!("/a/b/c.txt", join_path("/a/b", "//c.txt"));
241        assert_eq!("abc/def", join_path(" abc", "/def "));
242        assert_eq!("/abc", join_path("//", "/abc"));
243        assert_eq!("abc/def", join_path("abc/", "//def"));
244    }
245}