common_datasource/object_store/
s3.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use object_store::services::S3;
18use object_store::util::DefaultLoggingInterceptor;
19use object_store::ObjectStore;
20use snafu::ResultExt;
21
22use crate::error::{self, Result};
23
24const ENDPOINT: &str = "endpoint";
25const ACCESS_KEY_ID: &str = "access_key_id";
26const SECRET_ACCESS_KEY: &str = "secret_access_key";
27const SESSION_TOKEN: &str = "session_token";
28const REGION: &str = "region";
29const ENABLE_VIRTUAL_HOST_STYLE: &str = "enable_virtual_host_style";
30
31pub fn is_supported_in_s3(key: &str) -> bool {
32    [
33        ENDPOINT,
34        ACCESS_KEY_ID,
35        SECRET_ACCESS_KEY,
36        SESSION_TOKEN,
37        REGION,
38        ENABLE_VIRTUAL_HOST_STYLE,
39    ]
40    .contains(&key)
41}
42
43pub fn build_s3_backend(
44    host: &str,
45    path: &str,
46    connection: &HashMap<String, String>,
47) -> Result<ObjectStore> {
48    let mut builder = S3::default().root(path).bucket(host);
49
50    if let Some(endpoint) = connection.get(ENDPOINT) {
51        builder = builder.endpoint(endpoint);
52    }
53
54    if let Some(region) = connection.get(REGION) {
55        builder = builder.region(region);
56    }
57
58    if let Some(key_id) = connection.get(ACCESS_KEY_ID) {
59        builder = builder.access_key_id(key_id);
60    }
61
62    if let Some(key) = connection.get(SECRET_ACCESS_KEY) {
63        builder = builder.secret_access_key(key);
64    }
65
66    if let Some(session_token) = connection.get(SESSION_TOKEN) {
67        builder = builder.session_token(session_token);
68    }
69
70    if let Some(enable_str) = connection.get(ENABLE_VIRTUAL_HOST_STYLE) {
71        let enable = enable_str.as_str().parse::<bool>().map_err(|e| {
72            error::InvalidConnectionSnafu {
73                msg: format!(
74                    "failed to parse the option {}={}, {}",
75                    ENABLE_VIRTUAL_HOST_STYLE, enable_str, e
76                ),
77            }
78            .build()
79        })?;
80        if enable {
81            builder = builder.enable_virtual_host_style();
82        }
83    }
84
85    // TODO(weny): Consider finding a better way to eliminate duplicate code.
86    Ok(ObjectStore::new(builder)
87        .context(error::BuildBackendSnafu)?
88        .layer(object_store::layers::LoggingLayer::new(
89            DefaultLoggingInterceptor,
90        ))
91        .layer(object_store::layers::TracingLayer)
92        .layer(object_store::layers::build_prometheus_metrics_layer(true))
93        .finish())
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99    #[test]
100    fn test_is_supported_in_s3() {
101        assert!(is_supported_in_s3(ENDPOINT));
102        assert!(is_supported_in_s3(ACCESS_KEY_ID));
103        assert!(is_supported_in_s3(SECRET_ACCESS_KEY));
104        assert!(is_supported_in_s3(SESSION_TOKEN));
105        assert!(is_supported_in_s3(REGION));
106        assert!(is_supported_in_s3(ENABLE_VIRTUAL_HOST_STYLE));
107        assert!(!is_supported_in_s3("foo"))
108    }
109}