common_datasource/object_store/
s3.rs1use std::collections::HashMap;
16
17use object_store::services::S3;
18use object_store::util::DefaultLoggingInterceptor;
19use object_store::ObjectStore;
20use snafu::ResultExt;
21
22use crate::error::{self, Result};
23
24const ENDPOINT: &str = "endpoint";
25const ACCESS_KEY_ID: &str = "access_key_id";
26const SECRET_ACCESS_KEY: &str = "secret_access_key";
27const SESSION_TOKEN: &str = "session_token";
28const REGION: &str = "region";
29const ENABLE_VIRTUAL_HOST_STYLE: &str = "enable_virtual_host_style";
30
31pub fn is_supported_in_s3(key: &str) -> bool {
32 [
33 ENDPOINT,
34 ACCESS_KEY_ID,
35 SECRET_ACCESS_KEY,
36 SESSION_TOKEN,
37 REGION,
38 ENABLE_VIRTUAL_HOST_STYLE,
39 ]
40 .contains(&key)
41}
42
43pub fn build_s3_backend(
44 host: &str,
45 path: &str,
46 connection: &HashMap<String, String>,
47) -> Result<ObjectStore> {
48 let mut builder = S3::default().root(path).bucket(host);
49
50 if let Some(endpoint) = connection.get(ENDPOINT) {
51 builder = builder.endpoint(endpoint);
52 }
53
54 if let Some(region) = connection.get(REGION) {
55 builder = builder.region(region);
56 }
57
58 if let Some(key_id) = connection.get(ACCESS_KEY_ID) {
59 builder = builder.access_key_id(key_id);
60 }
61
62 if let Some(key) = connection.get(SECRET_ACCESS_KEY) {
63 builder = builder.secret_access_key(key);
64 }
65
66 if let Some(session_token) = connection.get(SESSION_TOKEN) {
67 builder = builder.session_token(session_token);
68 }
69
70 if let Some(enable_str) = connection.get(ENABLE_VIRTUAL_HOST_STYLE) {
71 let enable = enable_str.as_str().parse::<bool>().map_err(|e| {
72 error::InvalidConnectionSnafu {
73 msg: format!(
74 "failed to parse the option {}={}, {}",
75 ENABLE_VIRTUAL_HOST_STYLE, enable_str, e
76 ),
77 }
78 .build()
79 })?;
80 if enable {
81 builder = builder.enable_virtual_host_style();
82 }
83 }
84
85 Ok(ObjectStore::new(builder)
87 .context(error::BuildBackendSnafu)?
88 .layer(object_store::layers::LoggingLayer::new(
89 DefaultLoggingInterceptor,
90 ))
91 .layer(object_store::layers::TracingLayer)
92 .layer(object_store::layers::build_prometheus_metrics_layer(true))
93 .finish())
94}
95
96#[cfg(test)]
97mod tests {
98 use super::*;
99 #[test]
100 fn test_is_supported_in_s3() {
101 assert!(is_supported_in_s3(ENDPOINT));
102 assert!(is_supported_in_s3(ACCESS_KEY_ID));
103 assert!(is_supported_in_s3(SECRET_ACCESS_KEY));
104 assert!(is_supported_in_s3(SESSION_TOKEN));
105 assert!(is_supported_in_s3(REGION));
106 assert!(is_supported_in_s3(ENABLE_VIRTUAL_HOST_STYLE));
107 assert!(!is_supported_in_s3("foo"))
108 }
109}