common_datasource/object_store/
s3.rs1use std::collections::HashMap;
16
17use object_store::services::S3;
18use object_store::util::DefaultLoggingInterceptor;
19use object_store::ObjectStore;
20use snafu::ResultExt;
21
22use crate::error::{self, Result};
23
24const ENDPOINT: &str = "endpoint";
25const ACCESS_KEY_ID: &str = "access_key_id";
26const SECRET_ACCESS_KEY: &str = "secret_access_key";
27const SESSION_TOKEN: &str = "session_token";
28const REGION: &str = "region";
29const ENABLE_VIRTUAL_HOST_STYLE: &str = "enable_virtual_host_style";
30
31pub fn is_supported_in_s3(key: &str) -> bool {
32 [
33 ENDPOINT,
34 ACCESS_KEY_ID,
35 SECRET_ACCESS_KEY,
36 SESSION_TOKEN,
37 REGION,
38 ENABLE_VIRTUAL_HOST_STYLE,
39 ]
40 .contains(&key)
41}
42
43pub fn build_s3_backend(
44 host: &str,
45 path: &str,
46 connection: &HashMap<String, String>,
47) -> Result<ObjectStore> {
48 let mut builder = S3::default().root(path).bucket(host);
49
50 if let Some(endpoint) = connection.get(ENDPOINT) {
51 builder = builder.endpoint(endpoint);
52 }
53
54 if let Some(region) = connection.get(REGION) {
55 builder = builder.region(region);
56 }
57
58 if let Some(key_id) = connection.get(ACCESS_KEY_ID) {
59 builder = builder.access_key_id(key_id);
60 }
61
62 if let Some(key) = connection.get(SECRET_ACCESS_KEY) {
63 builder = builder.secret_access_key(key);
64 }
65
66 if let Some(session_token) = connection.get(SESSION_TOKEN) {
67 builder = builder.session_token(session_token);
68 }
69
70 if let Some(enable_str) = connection.get(ENABLE_VIRTUAL_HOST_STYLE) {
71 let enable = enable_str.as_str().parse::<bool>().map_err(|e| {
72 error::InvalidConnectionSnafu {
73 msg: format!(
74 "failed to parse the option {}={}, {}",
75 ENABLE_VIRTUAL_HOST_STYLE, enable_str, e
76 ),
77 }
78 .build()
79 })?;
80 if enable {
81 builder = builder.enable_virtual_host_style();
82 }
83 }
84
85 Ok(ObjectStore::new(builder)
87 .context(error::BuildBackendSnafu)?
88 .layer(
89 object_store::layers::RetryLayer::new()
90 .with_jitter()
91 .with_notify(object_store::util::PrintDetailedError),
92 )
93 .layer(object_store::layers::LoggingLayer::new(
94 DefaultLoggingInterceptor,
95 ))
96 .layer(object_store::layers::TracingLayer)
97 .layer(object_store::layers::build_prometheus_metrics_layer(true))
98 .finish())
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104 #[test]
105 fn test_is_supported_in_s3() {
106 assert!(is_supported_in_s3(ENDPOINT));
107 assert!(is_supported_in_s3(ACCESS_KEY_ID));
108 assert!(is_supported_in_s3(SECRET_ACCESS_KEY));
109 assert!(is_supported_in_s3(SESSION_TOKEN));
110 assert!(is_supported_in_s3(REGION));
111 assert!(is_supported_in_s3(ENABLE_VIRTUAL_HOST_STYLE));
112 assert!(!is_supported_in_s3("foo"))
113 }
114}