common_datasource/object_store/
oss.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use object_store::ObjectStore;
18use object_store::services::Oss;
19use object_store::util::{with_instrument_layers, with_retry_layers};
20use snafu::ResultExt;
21
22use crate::error::{self, Result};
23
24const BUCKET: &str = "bucket";
25const ENDPOINT: &str = "endpoint";
26const ACCESS_KEY_ID: &str = "access_key_id";
27const ACCESS_KEY_SECRET: &str = "access_key_secret";
28const ROOT: &str = "root";
29const ALLOW_ANONYMOUS: &str = "allow_anonymous";
30
31/// Check if the key is supported in OSS configuration.
32pub fn is_supported_in_oss(key: &str) -> bool {
33    [
34        ROOT,
35        ALLOW_ANONYMOUS,
36        BUCKET,
37        ENDPOINT,
38        ACCESS_KEY_ID,
39        ACCESS_KEY_SECRET,
40    ]
41    .contains(&key)
42}
43
44/// Build an OSS backend using the provided bucket, root, and connection parameters.
45pub fn build_oss_backend(
46    bucket: &str,
47    root: &str,
48    connection: &HashMap<String, String>,
49) -> Result<ObjectStore> {
50    let mut builder = Oss::default().bucket(bucket).root(root);
51
52    if let Some(endpoint) = connection.get(ENDPOINT) {
53        builder = builder.endpoint(endpoint);
54    }
55
56    if let Some(access_key_id) = connection.get(ACCESS_KEY_ID) {
57        builder = builder.access_key_id(access_key_id);
58    }
59
60    if let Some(access_key_secret) = connection.get(ACCESS_KEY_SECRET) {
61        builder = builder.access_key_secret(access_key_secret);
62    }
63
64    if let Some(allow_anonymous) = connection.get(ALLOW_ANONYMOUS) {
65        let allow = allow_anonymous.as_str().parse::<bool>().map_err(|e| {
66            error::InvalidConnectionSnafu {
67                msg: format!(
68                    "failed to parse the option {}={}, {}",
69                    ALLOW_ANONYMOUS, allow_anonymous, e
70                ),
71            }
72            .build()
73        })?;
74        if allow {
75            builder = builder.allow_anonymous();
76        }
77    }
78
79    let object_store = ObjectStore::new(builder)
80        .context(error::BuildBackendSnafu)?
81        .finish();
82    Ok(with_instrument_layers(
83        with_retry_layers(object_store),
84        true,
85    ))
86}
87
88#[cfg(test)]
89mod tests {
90    use super::*;
91
92    #[test]
93    fn test_is_supported_in_oss() {
94        assert!(is_supported_in_oss(ROOT));
95        assert!(is_supported_in_oss(ALLOW_ANONYMOUS));
96        assert!(is_supported_in_oss(BUCKET));
97        assert!(is_supported_in_oss(ENDPOINT));
98        assert!(is_supported_in_oss(ACCESS_KEY_ID));
99        assert!(is_supported_in_oss(ACCESS_KEY_SECRET));
100        assert!(!is_supported_in_oss("foo"));
101        assert!(!is_supported_in_oss("BAR"));
102    }
103
104    #[test]
105    fn test_build_oss_backend_all_fields_valid() {
106        let mut connection = HashMap::new();
107        connection.insert(
108            ENDPOINT.to_string(),
109            "http://oss-ap-southeast-1.aliyuncs.com".to_string(),
110        );
111        connection.insert(ACCESS_KEY_ID.to_string(), "key_id".to_string());
112        connection.insert(ACCESS_KEY_SECRET.to_string(), "key_secret".to_string());
113        connection.insert(ALLOW_ANONYMOUS.to_string(), "true".to_string());
114
115        let result = build_oss_backend("my-bucket", "my-root", &connection);
116        assert!(result.is_ok());
117    }
118}