common_datasource/object_store/
oss.rs1use std::collections::HashMap;
16
17use object_store::ObjectStore;
18use object_store::services::Oss;
19use object_store::util::{with_instrument_layers, with_retry_layers};
20use snafu::ResultExt;
21
22use crate::error::{self, Result};
23
24const BUCKET: &str = "bucket";
25const ENDPOINT: &str = "endpoint";
26const ACCESS_KEY_ID: &str = "access_key_id";
27const ACCESS_KEY_SECRET: &str = "access_key_secret";
28const ROOT: &str = "root";
29const ALLOW_ANONYMOUS: &str = "allow_anonymous";
30
31pub fn is_supported_in_oss(key: &str) -> bool {
33 [
34 ROOT,
35 ALLOW_ANONYMOUS,
36 BUCKET,
37 ENDPOINT,
38 ACCESS_KEY_ID,
39 ACCESS_KEY_SECRET,
40 ]
41 .contains(&key)
42}
43
44pub fn build_oss_backend(
46 bucket: &str,
47 root: &str,
48 connection: &HashMap<String, String>,
49) -> Result<ObjectStore> {
50 let mut builder = Oss::default().bucket(bucket).root(root);
51
52 if let Some(endpoint) = connection.get(ENDPOINT) {
53 builder = builder.endpoint(endpoint);
54 }
55
56 if let Some(access_key_id) = connection.get(ACCESS_KEY_ID) {
57 builder = builder.access_key_id(access_key_id);
58 }
59
60 if let Some(access_key_secret) = connection.get(ACCESS_KEY_SECRET) {
61 builder = builder.access_key_secret(access_key_secret);
62 }
63
64 if let Some(allow_anonymous) = connection.get(ALLOW_ANONYMOUS) {
65 let allow = allow_anonymous.as_str().parse::<bool>().map_err(|e| {
66 error::InvalidConnectionSnafu {
67 msg: format!(
68 "failed to parse the option {}={}, {}",
69 ALLOW_ANONYMOUS, allow_anonymous, e
70 ),
71 }
72 .build()
73 })?;
74 if allow {
75 builder = builder.allow_anonymous();
76 }
77 }
78
79 let object_store = ObjectStore::new(builder)
80 .context(error::BuildBackendSnafu)?
81 .finish();
82 Ok(with_instrument_layers(
83 with_retry_layers(object_store),
84 true,
85 ))
86}
87
88#[cfg(test)]
89mod tests {
90 use super::*;
91
92 #[test]
93 fn test_is_supported_in_oss() {
94 assert!(is_supported_in_oss(ROOT));
95 assert!(is_supported_in_oss(ALLOW_ANONYMOUS));
96 assert!(is_supported_in_oss(BUCKET));
97 assert!(is_supported_in_oss(ENDPOINT));
98 assert!(is_supported_in_oss(ACCESS_KEY_ID));
99 assert!(is_supported_in_oss(ACCESS_KEY_SECRET));
100 assert!(!is_supported_in_oss("foo"));
101 assert!(!is_supported_in_oss("BAR"));
102 }
103
104 #[test]
105 fn test_build_oss_backend_all_fields_valid() {
106 let mut connection = HashMap::new();
107 connection.insert(
108 ENDPOINT.to_string(),
109 "http://oss-ap-southeast-1.aliyuncs.com".to_string(),
110 );
111 connection.insert(ACCESS_KEY_ID.to_string(), "key_id".to_string());
112 connection.insert(ACCESS_KEY_SECRET.to_string(), "key_secret".to_string());
113 connection.insert(ALLOW_ANONYMOUS.to_string(), "true".to_string());
114
115 let result = build_oss_backend("my-bucket", "my-root", &connection);
116 assert!(result.is_ok());
117 }
118}