common_datasource/object_store/
s3.rs1use std::collections::HashMap;
16
17use object_store::ObjectStore;
18use object_store::services::S3;
19use object_store::util::{with_instrument_layers, with_retry_layers};
20use snafu::ResultExt;
21
22use crate::error::{self, Result};
23
24const ENDPOINT: &str = "endpoint";
25const ACCESS_KEY_ID: &str = "access_key_id";
26const SECRET_ACCESS_KEY: &str = "secret_access_key";
27const SESSION_TOKEN: &str = "session_token";
28const REGION: &str = "region";
29const ENABLE_VIRTUAL_HOST_STYLE: &str = "enable_virtual_host_style";
30const DISABLE_EC2_METADATA: &str = "disable_ec2_metadata";
31
32pub fn is_supported_in_s3(key: &str) -> bool {
33 [
34 ENDPOINT,
35 ACCESS_KEY_ID,
36 SECRET_ACCESS_KEY,
37 SESSION_TOKEN,
38 REGION,
39 ENABLE_VIRTUAL_HOST_STYLE,
40 DISABLE_EC2_METADATA,
41 ]
42 .contains(&key)
43}
44
45pub fn build_s3_backend(
46 host: &str,
47 path: &str,
48 connection: &HashMap<String, String>,
49) -> Result<ObjectStore> {
50 let mut builder = S3::default().root(path).bucket(host);
51
52 if let Some(endpoint) = connection.get(ENDPOINT) {
53 builder = builder.endpoint(endpoint);
54 }
55
56 if let Some(region) = connection.get(REGION) {
57 builder = builder.region(region);
58 }
59
60 if let Some(key_id) = connection.get(ACCESS_KEY_ID) {
61 builder = builder.access_key_id(key_id);
62 }
63
64 if let Some(key) = connection.get(SECRET_ACCESS_KEY) {
65 builder = builder.secret_access_key(key);
66 }
67
68 if let Some(session_token) = connection.get(SESSION_TOKEN) {
69 builder = builder.session_token(session_token);
70 }
71
72 if let Some(enable_str) = connection.get(ENABLE_VIRTUAL_HOST_STYLE) {
73 let enable = enable_str.as_str().parse::<bool>().map_err(|e| {
74 error::InvalidConnectionSnafu {
75 msg: format!(
76 "failed to parse the option {}={}, {}",
77 ENABLE_VIRTUAL_HOST_STYLE, enable_str, e
78 ),
79 }
80 .build()
81 })?;
82 if enable {
83 builder = builder.enable_virtual_host_style();
84 }
85 }
86
87 if let Some(disable_str) = connection.get(DISABLE_EC2_METADATA) {
88 let disable = disable_str.as_str().parse::<bool>().map_err(|e| {
89 error::InvalidConnectionSnafu {
90 msg: format!(
91 "failed to parse the option {}={}, {}",
92 DISABLE_EC2_METADATA, disable_str, e
93 ),
94 }
95 .build()
96 })?;
97 if disable {
98 builder = builder.disable_ec2_metadata();
99 }
100 }
101
102 let object_store = ObjectStore::new(builder)
103 .context(error::BuildBackendSnafu)?
104 .finish();
105 Ok(with_instrument_layers(
106 with_retry_layers(object_store),
107 true,
108 ))
109}
110
111#[cfg(test)]
112mod tests {
113 use super::*;
114 #[test]
115 fn test_is_supported_in_s3() {
116 assert!(is_supported_in_s3(ENDPOINT));
117 assert!(is_supported_in_s3(ACCESS_KEY_ID));
118 assert!(is_supported_in_s3(SECRET_ACCESS_KEY));
119 assert!(is_supported_in_s3(SESSION_TOKEN));
120 assert!(is_supported_in_s3(REGION));
121 assert!(is_supported_in_s3(ENABLE_VIRTUAL_HOST_STYLE));
122 assert!(is_supported_in_s3(DISABLE_EC2_METADATA));
123 assert!(!is_supported_in_s3("foo"))
124 }
125}