1use std::fmt::Display;
16
17use common_telemetry::{debug, error, trace};
18use opendal::layers::{LoggingInterceptor, LoggingLayer, TracingLayer};
19use opendal::raw::{AccessorInfo, Operation};
20use opendal::ErrorKind;
21
22use crate::ObjectStore;
23
24pub fn join_dir(parent: &str, child: &str) -> String {
32 let output = format!("{parent}/{child}/");
34 normalize_dir(&output)
35}
36
37pub fn normalize_dir(v: &str) -> String {
57 let has_root = v.starts_with('/');
58 let mut v = v
59 .split('/')
60 .filter(|v| !v.is_empty())
61 .collect::<Vec<&str>>()
62 .join("/");
63 if has_root {
64 v.insert(0, '/');
65 }
66 if !v.ends_with('/') {
67 v.push('/')
68 }
69 v
70}
71
72pub fn join_path(parent: &str, child: &str) -> String {
77 let output = format!("{parent}/{child}");
78 normalize_path(&output)
79}
80
81pub fn normalize_path(path: &str) -> String {
93 let path = path.trim();
95
96 if path.is_empty() {
98 return "/".to_string();
99 }
100
101 let has_leading = path.starts_with('/');
102 let has_trailing = path.ends_with('/');
103
104 let mut p = path
105 .split('/')
106 .filter(|v| !v.is_empty())
107 .collect::<Vec<_>>()
108 .join("/");
109
110 if !p.starts_with('/') && has_leading {
112 p.insert(0, '/');
113 }
114
115 if !p.ends_with('/') && has_trailing {
117 p.push('/');
118 }
119
120 p
121}
122
123pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore {
125 object_store
126 .layer(LoggingLayer::new(DefaultLoggingInterceptor))
127 .layer(TracingLayer)
128 .layer(crate::layers::build_prometheus_metrics_layer(path_label))
129}
130
131static LOGGING_TARGET: &str = "opendal::services";
132
133struct LoggingContext<'a>(&'a [(&'a str, &'a str)]);
134
135impl Display for LoggingContext<'_> {
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 for (i, (k, v)) in self.0.iter().enumerate() {
138 if i > 0 {
139 write!(f, " {}={}", k, v)?;
140 } else {
141 write!(f, "{}={}", k, v)?;
142 }
143 }
144 Ok(())
145 }
146}
147
148#[derive(Debug, Copy, Clone, Default)]
149pub struct DefaultLoggingInterceptor;
150
151impl LoggingInterceptor for DefaultLoggingInterceptor {
152 #[inline]
153 fn log(
154 &self,
155 info: &AccessorInfo,
156 operation: Operation,
157 context: &[(&str, &str)],
158 message: &str,
159 err: Option<&opendal::Error>,
160 ) {
161 if let Some(err) = err {
162 if err.kind() == ErrorKind::Unexpected {
164 error!(
165 target: LOGGING_TARGET,
166 "service={} name={} {}: {operation} {message} {err:#?}",
167 info.scheme(),
168 info.name(),
169 LoggingContext(context),
170 );
171 } else {
172 debug!(
173 target: LOGGING_TARGET,
174 "service={} name={} {}: {operation} {message} {err}",
175 info.scheme(),
176 info.name(),
177 LoggingContext(context),
178 );
179 };
180 }
181
182 if operation.is_oneshot() {
184 debug!(
185 target: LOGGING_TARGET,
186 "service={} name={} {}: {operation} {message}",
187 info.scheme(),
188 info.name(),
189 LoggingContext(context),
190 );
191 } else {
192 trace!(
193 target: LOGGING_TARGET,
194 "service={} name={} {}: {operation} {message}",
195 info.scheme(),
196 info.name(),
197 LoggingContext(context),
198 );
199 };
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206
207 #[test]
208 fn test_normalize_dir() {
209 assert_eq!("/", normalize_dir("/"));
210 assert_eq!("/", normalize_dir(""));
211 assert_eq!("/test/", normalize_dir("/test"));
212 }
213
214 #[test]
215 fn test_join_dir() {
216 assert_eq!("/", join_dir("", ""));
217 assert_eq!("/", join_dir("/", ""));
218 assert_eq!("/", join_dir("", "/"));
219 assert_eq!("/", join_dir("/", "/"));
220 assert_eq!("/a/", join_dir("/a", ""));
221 assert_eq!("a/b/c/", join_dir("a/b", "c"));
222 assert_eq!("/a/b/c/", join_dir("/a/b", "c"));
223 assert_eq!("/a/b/c/", join_dir("/a/b", "c/"));
224 assert_eq!("/a/b/c/", join_dir("/a/b", "/c/"));
225 assert_eq!("/a/b/c/", join_dir("/a/b", "//c"));
226 }
227
228 #[test]
229 fn test_join_path() {
230 assert_eq!("/", join_path("", ""));
231 assert_eq!("/", join_path("/", ""));
232 assert_eq!("/", join_path("", "/"));
233 assert_eq!("/", join_path("/", "/"));
234 assert_eq!("a/", join_path("a", ""));
235 assert_eq!("/a", join_path("/", "a"));
236 assert_eq!("a/b/c.txt", join_path("a/b", "c.txt"));
237 assert_eq!("/a/b/c.txt", join_path("/a/b", "c.txt"));
238 assert_eq!("/a/b/c/", join_path("/a/b", "c/"));
239 assert_eq!("/a/b/c/", join_path("/a/b", "/c/"));
240 assert_eq!("/a/b/c.txt", join_path("/a/b", "//c.txt"));
241 assert_eq!("abc/def", join_path(" abc", "/def "));
242 assert_eq!("/abc", join_path("//", "/abc"));
243 assert_eq!("abc/def", join_path("abc/", "//def"));
244 }
245}