cli/common/
object_store.rs1use common_base::secrets::SecretString;
16use common_error::ext::BoxedError;
17use object_store::services::{Azblob, Fs, Gcs, Oss, S3};
18use object_store::util::{with_instrument_layers, with_retry_layers};
19use object_store::{AzblobConnection, GcsConnection, ObjectStore, OssConnection, S3Connection};
20use paste::paste;
21use snafu::ResultExt;
22
23use crate::error::{self};
24
25macro_rules! wrap_with_clap_prefix {
26 (
27 $new_name:ident, $prefix:literal, $base:ty, {
28 $( $( #[doc = $doc:expr] )? $( #[alias = $alias:literal] )? $field:ident : $type:ty $( = $default:expr )? ),* $(,)?
29 }
30 ) => {
31 paste!{
32 #[derive(clap::Parser, Debug, Clone, PartialEq, Default)]
33 pub struct $new_name {
34 $(
35 $( #[doc = $doc] )?
36 $( #[clap(alias = $alias)] )?
37 #[clap(long $(, default_value_t = $default )? )]
38 [<$prefix $field>]: $type,
39 )*
40 }
41
42 impl From<$new_name> for $base {
43 fn from(w: $new_name) -> Self {
44 Self {
45 $( $field: w.[<$prefix $field>] ),*
46 }
47 }
48 }
49 }
50 };
51}
52
53wrap_with_clap_prefix! {
54 PrefixedAzblobConnection,
55 "azblob-",
56 AzblobConnection,
57 {
58 #[doc = "The container of the object store."]
59 container: String = Default::default(),
60 #[doc = "The root of the object store."]
61 root: String = Default::default(),
62 #[doc = "The account name of the object store."]
63 account_name: SecretString = Default::default(),
64 #[doc = "The account key of the object store."]
65 account_key: SecretString = Default::default(),
66 #[doc = "The endpoint of the object store."]
67 endpoint: String = Default::default(),
68 #[doc = "The SAS token of the object store."]
69 sas_token: Option<String>,
70 }
71}
72
73wrap_with_clap_prefix! {
74 PrefixedS3Connection,
75 "s3-",
76 S3Connection,
77 {
78 #[doc = "The bucket of the object store."]
79 bucket: String = Default::default(),
80 #[doc = "The root of the object store."]
81 root: String = Default::default(),
82 #[doc = "The access key ID of the object store."]
83 access_key_id: SecretString = Default::default(),
84 #[doc = "The secret access key of the object store."]
85 secret_access_key: SecretString = Default::default(),
86 #[doc = "The endpoint of the object store."]
87 endpoint: Option<String>,
88 #[doc = "The region of the object store."]
89 region: Option<String>,
90 #[doc = "Enable virtual host style for the object store."]
91 enable_virtual_host_style: bool = Default::default(),
92 }
93}
94
95wrap_with_clap_prefix! {
96 PrefixedOssConnection,
97 "oss-",
98 OssConnection,
99 {
100 #[doc = "The bucket of the object store."]
101 bucket: String = Default::default(),
102 #[doc = "The root of the object store."]
103 root: String = Default::default(),
104 #[doc = "The access key ID of the object store."]
105 access_key_id: SecretString = Default::default(),
106 #[doc = "The access key secret of the object store."]
107 access_key_secret: SecretString = Default::default(),
108 #[doc = "The endpoint of the object store."]
109 endpoint: String = Default::default(),
110 }
111}
112
113wrap_with_clap_prefix! {
114 PrefixedGcsConnection,
115 "gcs-",
116 GcsConnection,
117 {
118 #[doc = "The root of the object store."]
119 root: String = Default::default(),
120 #[doc = "The bucket of the object store."]
121 bucket: String = Default::default(),
122 #[doc = "The scope of the object store."]
123 scope: String = Default::default(),
124 #[doc = "The credential path of the object store."]
125 credential_path: SecretString = Default::default(),
126 #[doc = "The credential of the object store."]
127 credential: SecretString = Default::default(),
128 #[doc = "The endpoint of the object store."]
129 endpoint: String = Default::default(),
130 }
131}
132
133#[derive(clap::Parser, Debug, Clone, PartialEq, Default)]
135pub struct ObjectStoreConfig {
136 #[clap(long, alias = "s3")]
138 pub enable_s3: bool,
139
140 #[clap(flatten)]
141 pub s3: PrefixedS3Connection,
142
143 #[clap(long, alias = "oss")]
145 pub enable_oss: bool,
146
147 #[clap(flatten)]
148 pub oss: PrefixedOssConnection,
149
150 #[clap(long, alias = "gcs")]
152 pub enable_gcs: bool,
153
154 #[clap(flatten)]
155 pub gcs: PrefixedGcsConnection,
156
157 #[clap(long, alias = "azblob")]
159 pub enable_azblob: bool,
160
161 #[clap(flatten)]
162 pub azblob: PrefixedAzblobConnection,
163}
164
165pub fn new_fs_object_store(root: &str) -> std::result::Result<ObjectStore, BoxedError> {
167 let builder = Fs::default().root(root);
168 let object_store = ObjectStore::new(builder)
169 .context(error::InitBackendSnafu)
170 .map_err(BoxedError::new)?
171 .finish();
172
173 Ok(with_instrument_layers(object_store, false))
174}
175
176impl ObjectStoreConfig {
177 pub fn build(&self) -> Result<Option<ObjectStore>, BoxedError> {
179 let object_store = if self.enable_s3 {
180 let s3 = S3Connection::from(self.s3.clone());
181 common_telemetry::info!("Building object store with s3: {:?}", s3);
182 Some(
183 ObjectStore::new(S3::from(&s3))
184 .context(error::InitBackendSnafu)
185 .map_err(BoxedError::new)?
186 .finish(),
187 )
188 } else if self.enable_oss {
189 let oss = OssConnection::from(self.oss.clone());
190 common_telemetry::info!("Building object store with oss: {:?}", oss);
191 Some(
192 ObjectStore::new(Oss::from(&oss))
193 .context(error::InitBackendSnafu)
194 .map_err(BoxedError::new)?
195 .finish(),
196 )
197 } else if self.enable_gcs {
198 let gcs = GcsConnection::from(self.gcs.clone());
199 common_telemetry::info!("Building object store with gcs: {:?}", gcs);
200 Some(
201 ObjectStore::new(Gcs::from(&gcs))
202 .context(error::InitBackendSnafu)
203 .map_err(BoxedError::new)?
204 .finish(),
205 )
206 } else if self.enable_azblob {
207 let azblob = AzblobConnection::from(self.azblob.clone());
208 common_telemetry::info!("Building object store with azblob: {:?}", azblob);
209 Some(
210 ObjectStore::new(Azblob::from(&azblob))
211 .context(error::InitBackendSnafu)
212 .map_err(BoxedError::new)?
213 .finish(),
214 )
215 } else {
216 None
217 };
218
219 let object_store = object_store
220 .map(|object_store| with_instrument_layers(with_retry_layers(object_store), false));
221
222 Ok(object_store)
223 }
224}