common_datasource/object_store/
oss.rs1use std::collections::HashMap;
16
17use object_store::services::Oss;
18use object_store::ObjectStore;
19use snafu::ResultExt;
20
21use crate::error::{self, Result};
22
23const BUCKET: &str = "bucket";
24const ENDPOINT: &str = "endpoint";
25const ACCESS_KEY_ID: &str = "access_key_id";
26const ACCESS_KEY_SECRET: &str = "access_key_secret";
27const ROOT: &str = "root";
28const ALLOW_ANONYMOUS: &str = "allow_anonymous";
29
30pub fn is_supported_in_oss(key: &str) -> bool {
32 [
33 ROOT,
34 ALLOW_ANONYMOUS,
35 BUCKET,
36 ENDPOINT,
37 ACCESS_KEY_ID,
38 ACCESS_KEY_SECRET,
39 ]
40 .contains(&key)
41}
42
43pub fn build_oss_backend(
45 bucket: &str,
46 root: &str,
47 connection: &HashMap<String, String>,
48) -> Result<ObjectStore> {
49 let mut builder = Oss::default().bucket(bucket).root(root);
50
51 if let Some(endpoint) = connection.get(ENDPOINT) {
52 builder = builder.endpoint(endpoint);
53 }
54
55 if let Some(access_key_id) = connection.get(ACCESS_KEY_ID) {
56 builder = builder.access_key_id(access_key_id);
57 }
58
59 if let Some(access_key_secret) = connection.get(ACCESS_KEY_SECRET) {
60 builder = builder.access_key_secret(access_key_secret);
61 }
62
63 if let Some(allow_anonymous) = connection.get(ALLOW_ANONYMOUS) {
64 let allow = allow_anonymous.as_str().parse::<bool>().map_err(|e| {
65 error::InvalidConnectionSnafu {
66 msg: format!(
67 "failed to parse the option {}={}, {}",
68 ALLOW_ANONYMOUS, allow_anonymous, e
69 ),
70 }
71 .build()
72 })?;
73 if allow {
74 builder = builder.allow_anonymous();
75 }
76 }
77
78 let op = ObjectStore::new(builder)
79 .context(error::BuildBackendSnafu)?
80 .layer(
81 object_store::layers::RetryLayer::new()
82 .with_jitter()
83 .with_notify(object_store::util::PrintDetailedError),
84 )
85 .layer(object_store::layers::LoggingLayer::default())
86 .layer(object_store::layers::TracingLayer)
87 .layer(object_store::layers::build_prometheus_metrics_layer(true))
88 .finish();
89
90 Ok(op)
91}
92
93#[cfg(test)]
94mod tests {
95 use super::*;
96
97 #[test]
98 fn test_is_supported_in_oss() {
99 assert!(is_supported_in_oss(ROOT));
100 assert!(is_supported_in_oss(ALLOW_ANONYMOUS));
101 assert!(is_supported_in_oss(BUCKET));
102 assert!(is_supported_in_oss(ENDPOINT));
103 assert!(is_supported_in_oss(ACCESS_KEY_ID));
104 assert!(is_supported_in_oss(ACCESS_KEY_SECRET));
105 assert!(!is_supported_in_oss("foo"));
106 assert!(!is_supported_in_oss("BAR"));
107 }
108
109 #[test]
110 fn test_build_oss_backend_all_fields_valid() {
111 let mut connection = HashMap::new();
112 connection.insert(
113 ENDPOINT.to_string(),
114 "http://oss-ap-southeast-1.aliyuncs.com".to_string(),
115 );
116 connection.insert(ACCESS_KEY_ID.to_string(), "key_id".to_string());
117 connection.insert(ACCESS_KEY_SECRET.to_string(), "key_secret".to_string());
118 connection.insert(ALLOW_ANONYMOUS.to_string(), "true".to_string());
119
120 let result = build_oss_backend("my-bucket", "my-root", &connection);
121 assert!(result.is_ok());
122 }
123}