common_datasource/object_store/
s3.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use object_store::ObjectStore;
18use object_store::services::S3;
19use object_store::util::DefaultLoggingInterceptor;
20use snafu::ResultExt;
21
22use crate::error::{self, Result};
23
24const ENDPOINT: &str = "endpoint";
25const ACCESS_KEY_ID: &str = "access_key_id";
26const SECRET_ACCESS_KEY: &str = "secret_access_key";
27const SESSION_TOKEN: &str = "session_token";
28const REGION: &str = "region";
29const ENABLE_VIRTUAL_HOST_STYLE: &str = "enable_virtual_host_style";
30const DISABLE_EC2_METADATA: &str = "disable_ec2_metadata";
31
32pub fn is_supported_in_s3(key: &str) -> bool {
33    [
34        ENDPOINT,
35        ACCESS_KEY_ID,
36        SECRET_ACCESS_KEY,
37        SESSION_TOKEN,
38        REGION,
39        ENABLE_VIRTUAL_HOST_STYLE,
40        DISABLE_EC2_METADATA,
41    ]
42    .contains(&key)
43}
44
45pub fn build_s3_backend(
46    host: &str,
47    path: &str,
48    connection: &HashMap<String, String>,
49) -> Result<ObjectStore> {
50    let mut builder = S3::default().root(path).bucket(host);
51
52    if let Some(endpoint) = connection.get(ENDPOINT) {
53        builder = builder.endpoint(endpoint);
54    }
55
56    if let Some(region) = connection.get(REGION) {
57        builder = builder.region(region);
58    }
59
60    if let Some(key_id) = connection.get(ACCESS_KEY_ID) {
61        builder = builder.access_key_id(key_id);
62    }
63
64    if let Some(key) = connection.get(SECRET_ACCESS_KEY) {
65        builder = builder.secret_access_key(key);
66    }
67
68    if let Some(session_token) = connection.get(SESSION_TOKEN) {
69        builder = builder.session_token(session_token);
70    }
71
72    if let Some(enable_str) = connection.get(ENABLE_VIRTUAL_HOST_STYLE) {
73        let enable = enable_str.as_str().parse::<bool>().map_err(|e| {
74            error::InvalidConnectionSnafu {
75                msg: format!(
76                    "failed to parse the option {}={}, {}",
77                    ENABLE_VIRTUAL_HOST_STYLE, enable_str, e
78                ),
79            }
80            .build()
81        })?;
82        if enable {
83            builder = builder.enable_virtual_host_style();
84        }
85    }
86
87    if let Some(disable_str) = connection.get(DISABLE_EC2_METADATA) {
88        let disable = disable_str.as_str().parse::<bool>().map_err(|e| {
89            error::InvalidConnectionSnafu {
90                msg: format!(
91                    "failed to parse the option {}={}, {}",
92                    DISABLE_EC2_METADATA, disable_str, e
93                ),
94            }
95            .build()
96        })?;
97        if disable {
98            builder = builder.disable_ec2_metadata();
99        }
100    }
101
102    // TODO(weny): Consider finding a better way to eliminate duplicate code.
103    Ok(ObjectStore::new(builder)
104        .context(error::BuildBackendSnafu)?
105        .layer(
106            object_store::layers::RetryLayer::new()
107                .with_jitter()
108                .with_notify(object_store::util::PrintDetailedError),
109        )
110        .layer(object_store::layers::LoggingLayer::new(
111            DefaultLoggingInterceptor,
112        ))
113        .layer(object_store::layers::TracingLayer)
114        .layer(object_store::layers::build_prometheus_metrics_layer(true))
115        .finish())
116}
117
118#[cfg(test)]
119mod tests {
120    use super::*;
121    #[test]
122    fn test_is_supported_in_s3() {
123        assert!(is_supported_in_s3(ENDPOINT));
124        assert!(is_supported_in_s3(ACCESS_KEY_ID));
125        assert!(is_supported_in_s3(SECRET_ACCESS_KEY));
126        assert!(is_supported_in_s3(SESSION_TOKEN));
127        assert!(is_supported_in_s3(REGION));
128        assert!(is_supported_in_s3(ENABLE_VIRTUAL_HOST_STYLE));
129        assert!(is_supported_in_s3(DISABLE_EC2_METADATA));
130        assert!(!is_supported_in_s3("foo"))
131    }
132}