common_datasource/object_store/
s3.rs1use std::collections::HashMap;
16
17use object_store::ObjectStore;
18use object_store::services::S3;
19use object_store::util::DefaultLoggingInterceptor;
20use snafu::ResultExt;
21
22use crate::error::{self, Result};
23
24const ENDPOINT: &str = "endpoint";
25const ACCESS_KEY_ID: &str = "access_key_id";
26const SECRET_ACCESS_KEY: &str = "secret_access_key";
27const SESSION_TOKEN: &str = "session_token";
28const REGION: &str = "region";
29const ENABLE_VIRTUAL_HOST_STYLE: &str = "enable_virtual_host_style";
30const DISABLE_EC2_METADATA: &str = "disable_ec2_metadata";
31
32pub fn is_supported_in_s3(key: &str) -> bool {
33 [
34 ENDPOINT,
35 ACCESS_KEY_ID,
36 SECRET_ACCESS_KEY,
37 SESSION_TOKEN,
38 REGION,
39 ENABLE_VIRTUAL_HOST_STYLE,
40 DISABLE_EC2_METADATA,
41 ]
42 .contains(&key)
43}
44
45pub fn build_s3_backend(
46 host: &str,
47 path: &str,
48 connection: &HashMap<String, String>,
49) -> Result<ObjectStore> {
50 let mut builder = S3::default().root(path).bucket(host);
51
52 if let Some(endpoint) = connection.get(ENDPOINT) {
53 builder = builder.endpoint(endpoint);
54 }
55
56 if let Some(region) = connection.get(REGION) {
57 builder = builder.region(region);
58 }
59
60 if let Some(key_id) = connection.get(ACCESS_KEY_ID) {
61 builder = builder.access_key_id(key_id);
62 }
63
64 if let Some(key) = connection.get(SECRET_ACCESS_KEY) {
65 builder = builder.secret_access_key(key);
66 }
67
68 if let Some(session_token) = connection.get(SESSION_TOKEN) {
69 builder = builder.session_token(session_token);
70 }
71
72 if let Some(enable_str) = connection.get(ENABLE_VIRTUAL_HOST_STYLE) {
73 let enable = enable_str.as_str().parse::<bool>().map_err(|e| {
74 error::InvalidConnectionSnafu {
75 msg: format!(
76 "failed to parse the option {}={}, {}",
77 ENABLE_VIRTUAL_HOST_STYLE, enable_str, e
78 ),
79 }
80 .build()
81 })?;
82 if enable {
83 builder = builder.enable_virtual_host_style();
84 }
85 }
86
87 if let Some(disable_str) = connection.get(DISABLE_EC2_METADATA) {
88 let disable = disable_str.as_str().parse::<bool>().map_err(|e| {
89 error::InvalidConnectionSnafu {
90 msg: format!(
91 "failed to parse the option {}={}, {}",
92 DISABLE_EC2_METADATA, disable_str, e
93 ),
94 }
95 .build()
96 })?;
97 if disable {
98 builder = builder.disable_ec2_metadata();
99 }
100 }
101
102 Ok(ObjectStore::new(builder)
104 .context(error::BuildBackendSnafu)?
105 .layer(
106 object_store::layers::RetryLayer::new()
107 .with_jitter()
108 .with_notify(object_store::util::PrintDetailedError),
109 )
110 .layer(object_store::layers::LoggingLayer::new(
111 DefaultLoggingInterceptor,
112 ))
113 .layer(object_store::layers::TracingLayer)
114 .layer(object_store::layers::build_prometheus_metrics_layer(true))
115 .finish())
116}
117
118#[cfg(test)]
119mod tests {
120 use super::*;
121 #[test]
122 fn test_is_supported_in_s3() {
123 assert!(is_supported_in_s3(ENDPOINT));
124 assert!(is_supported_in_s3(ACCESS_KEY_ID));
125 assert!(is_supported_in_s3(SECRET_ACCESS_KEY));
126 assert!(is_supported_in_s3(SESSION_TOKEN));
127 assert!(is_supported_in_s3(REGION));
128 assert!(is_supported_in_s3(ENABLE_VIRTUAL_HOST_STYLE));
129 assert!(is_supported_in_s3(DISABLE_EC2_METADATA));
130 assert!(!is_supported_in_s3("foo"))
131 }
132}