1use common_base::secrets::{ExposeSecret, SecretString};
16use common_error::ext::BoxedError;
17use object_store::services::{Azblob, Fs, Gcs, Oss, S3};
18use object_store::util::{with_instrument_layers, with_retry_layers};
19use object_store::{AzblobConnection, GcsConnection, ObjectStore, OssConnection, S3Connection};
20use paste::paste;
21use snafu::ResultExt;
22
23use crate::error::{self};
24
25trait IntoField<T> {
29 fn into_field(self) -> T;
30}
31
32impl<T> IntoField<T> for T {
34 fn into_field(self) -> T {
35 self
36 }
37}
38
39impl IntoField<SecretString> for Option<SecretString> {
41 fn into_field(self) -> SecretString {
42 self.unwrap_or_default()
43 }
44}
45
46trait FieldValidator {
52 fn is_empty(&self) -> bool;
54}
55
56impl FieldValidator for String {
58 fn is_empty(&self) -> bool {
59 self.is_empty()
60 }
61}
62
63impl FieldValidator for bool {
65 fn is_empty(&self) -> bool {
66 !self
67 }
68}
69
70impl FieldValidator for Option<String> {
72 fn is_empty(&self) -> bool {
73 self.as_ref().is_none_or(|s| s.is_empty())
74 }
75}
76
77impl FieldValidator for Option<SecretString> {
80 fn is_empty(&self) -> bool {
81 self.as_ref().is_none_or(|s| s.expose_secret().is_empty())
82 }
83}
84
85macro_rules! wrap_with_clap_prefix {
86 (
87 $new_name:ident, $prefix:literal, $enable_flag:literal, $base:ty, {
88 $( $( #[doc = $doc:expr] )? $( #[alias = $alias:literal] )? $( #[hide = $hide:literal] )? $field:ident : $type:ty $( = $default:expr )? ),* $(,)?
89 }
90 ) => {
91 paste!{
92 #[derive(clap::Parser, Debug, Clone, PartialEq, Default)]
93 pub struct $new_name {
94 $(
95 $( #[doc = $doc] )?
96 $( #[clap(alias = $alias)] )?
97 $( #[clap(hide = $hide)] )?
98 #[clap(long, requires = $enable_flag $(, default_value_t = $default )? )]
99 pub [<$prefix $field>]: $type,
100 )*
101 }
102
103 impl From<$new_name> for $base {
104 fn from(w: $new_name) -> Self {
105 Self {
106 $( $field: w.[<$prefix $field>].into_field() ),*
108 }
109 }
110 }
111 }
112 };
113}
114
115macro_rules! validate_backend {
156 (
157 enable: $enable:expr,
158 name: $backend_name:expr,
159 required: [ $( ($field:expr, $field_name:expr) ),* $(,)? ]
160 $(, custom_validator: $custom_validator:expr)?
161 ) => {{
162 if $enable {
163 let mut missing = Vec::new();
165 $(
166 if FieldValidator::is_empty($field) {
167 missing.push($field_name);
168 }
169 )*
170
171 $(
173 $custom_validator(&mut missing);
174 )?
175
176 if !missing.is_empty() {
177 return Err(BoxedError::new(
178 error::MissingConfigSnafu {
179 msg: format!(
180 "{} {} must be set when --{} is enabled.",
181 $backend_name,
182 missing.join(", "),
183 $backend_name.to_lowercase()
184 ),
185 }
186 .build(),
187 ));
188 }
189 }
190
191 Ok(())
192 }};
193}
194
195wrap_with_clap_prefix! {
196 PrefixedAzblobConnection,
197 "azblob-",
198 "enable_azblob",
199 AzblobConnection,
200 {
201 #[doc = "The container of the object store."]
202 container: String = Default::default(),
203 #[doc = "The root of the object store."]
204 root: String = Default::default(),
205 #[doc = "The account name of the object store."]
206 account_name: Option<SecretString>,
207 #[doc = "The account key of the object store."]
208 account_key: Option<SecretString>,
209 #[doc = "The endpoint of the object store."]
210 endpoint: String = Default::default(),
211 #[doc = "The SAS token of the object store."]
212 sas_token: Option<String>,
213 }
214}
215
216impl PrefixedAzblobConnection {
217 pub fn validate(&self) -> Result<(), BoxedError> {
218 validate_backend!(
219 enable: true,
220 name: "AzBlob",
221 required: [
222 (&self.azblob_container, "container"),
223 (&self.azblob_root, "root"),
224 (&self.azblob_account_name, "account name"),
225 (&self.azblob_endpoint, "endpoint"),
226 ],
227 custom_validator: |missing: &mut Vec<&str>| {
228 if self.azblob_sas_token.is_none()
230 && self.azblob_account_key.is_empty()
231 {
232 missing.push("account key (when sas_token is not provided)");
233 }
234 }
235 )
236 }
237}
238
239wrap_with_clap_prefix! {
240 PrefixedS3Connection,
241 "s3-",
242 "enable_s3",
243 S3Connection,
244 {
245 #[doc = "The bucket of the object store."]
246 bucket: String = Default::default(),
247 #[doc = "The root of the object store."]
248 root: String = Default::default(),
249 #[doc = "The access key ID of the object store."]
250 access_key_id: Option<SecretString>,
251 #[doc = "The secret access key of the object store."]
252 secret_access_key: Option<SecretString>,
253 #[doc = "The endpoint of the object store."]
254 endpoint: Option<String>,
255 #[doc = "The region of the object store."]
256 region: Option<String>,
257 #[doc = "Enable virtual host style for the object store."]
258 enable_virtual_host_style: bool = Default::default(),
259 #[doc = "Disable EC2 metadata service for the object store."]
260 disable_ec2_metadata: bool = Default::default(),
261 }
262}
263
264impl PrefixedS3Connection {
265 pub fn validate(&self) -> Result<(), BoxedError> {
266 validate_backend!(
267 enable: true,
268 name: "S3",
269 required: [
270 (&self.s3_bucket, "bucket"),
271 (&self.s3_region, "region"),
272 ]
273 )
274 }
275}
276
277wrap_with_clap_prefix! {
278 PrefixedOssConnection,
279 "oss-",
280 "enable_oss",
281 OssConnection,
282 {
283 #[doc = "The bucket of the object store."]
284 bucket: String = Default::default(),
285 #[doc = "The root of the object store."]
286 root: String = Default::default(),
287 #[doc = "The access key ID of the object store."]
288 access_key_id: Option<SecretString>,
289 #[doc = "The access key secret of the object store."]
290 access_key_secret: Option<SecretString>,
291 #[doc = "The endpoint of the object store."]
292 endpoint: String = Default::default(),
293 }
294}
295
296impl PrefixedOssConnection {
297 pub fn validate(&self) -> Result<(), BoxedError> {
298 validate_backend!(
299 enable: true,
300 name: "OSS",
301 required: [
302 (&self.oss_bucket, "bucket"),
303 (&self.oss_access_key_id, "access key ID"),
304 (&self.oss_access_key_secret, "access key secret"),
305 (&self.oss_endpoint, "endpoint"),
306 ]
307 )
308 }
309}
310
311wrap_with_clap_prefix! {
312 PrefixedGcsConnection,
313 "gcs-",
314 "enable_gcs",
315 GcsConnection,
316 {
317 #[doc = "The root of the object store."]
318 root: String = Default::default(),
319 #[doc = "The bucket of the object store."]
320 bucket: String = Default::default(),
321 #[doc = "The scope of the object store."]
322 scope: String = Default::default(),
323 #[doc = "The credential path of the object store."]
324 #[hide = true]
325 credential_path: Option<SecretString>,
326 #[doc = "The credential of the object store."]
327 credential: Option<SecretString>,
328 #[doc = "The endpoint of the object store."]
329 endpoint: String = Default::default(),
330 }
331}
332
333impl PrefixedGcsConnection {
334 pub fn validate(&self) -> Result<(), BoxedError> {
335 validate_backend!(
336 enable: true,
337 name: "GCS",
338 required: [
339 (&self.gcs_bucket, "bucket"),
340 (&self.gcs_root, "root"),
341 (&self.gcs_scope, "scope"),
342 ]
343 )
347 }
348}
349
350#[derive(clap::Parser, Debug, Clone, PartialEq, Default)]
366#[clap(group(clap::ArgGroup::new("storage_backend").required(false).multiple(false)))]
367pub struct ObjectStoreConfig {
368 #[clap(long = "s3", group = "storage_backend")]
370 pub enable_s3: bool,
371
372 #[clap(flatten)]
373 pub s3: PrefixedS3Connection,
374
375 #[clap(long = "oss", group = "storage_backend")]
377 pub enable_oss: bool,
378
379 #[clap(flatten)]
380 pub oss: PrefixedOssConnection,
381
382 #[clap(long = "gcs", group = "storage_backend")]
384 pub enable_gcs: bool,
385
386 #[clap(flatten)]
387 pub gcs: PrefixedGcsConnection,
388
389 #[clap(long = "azblob", group = "storage_backend")]
391 pub enable_azblob: bool,
392
393 #[clap(flatten)]
394 pub azblob: PrefixedAzblobConnection,
395}
396
397pub fn new_fs_object_store(root: &str) -> std::result::Result<ObjectStore, BoxedError> {
399 let builder = Fs::default().root(root);
400 let object_store = ObjectStore::new(builder)
401 .context(error::InitBackendSnafu)
402 .map_err(BoxedError::new)?
403 .finish();
404
405 Ok(with_instrument_layers(object_store, false))
406}
407
408macro_rules! gen_object_store_builder {
409 ($method:ident, $field:ident, $conn_type:ty, $service_type:ty) => {
410 pub fn $method(&self) -> Result<ObjectStore, BoxedError> {
411 let config = <$conn_type>::from(self.$field.clone());
412 common_telemetry::info!(
413 "Building object store with {}: {:?}",
414 stringify!($field),
415 config
416 );
417 let object_store = ObjectStore::new(<$service_type>::from(&config))
418 .context(error::InitBackendSnafu)
419 .map_err(BoxedError::new)?
420 .finish();
421 Ok(with_instrument_layers(
422 with_retry_layers(object_store),
423 false,
424 ))
425 }
426 };
427}
428
429impl ObjectStoreConfig {
430 gen_object_store_builder!(build_s3, s3, S3Connection, S3);
431
432 gen_object_store_builder!(build_oss, oss, OssConnection, Oss);
433
434 gen_object_store_builder!(build_gcs, gcs, GcsConnection, Gcs);
435
436 gen_object_store_builder!(build_azblob, azblob, AzblobConnection, Azblob);
437
438 pub fn validate(&self) -> Result<(), BoxedError> {
439 if self.enable_s3 {
440 self.s3.validate()?;
441 }
442 if self.enable_oss {
443 self.oss.validate()?;
444 }
445 if self.enable_gcs {
446 self.gcs.validate()?;
447 }
448 if self.enable_azblob {
449 self.azblob.validate()?;
450 }
451 Ok(())
452 }
453
454 pub fn build(&self) -> Result<Option<ObjectStore>, BoxedError> {
456 self.validate()?;
457
458 if self.enable_s3 {
459 self.build_s3().map(Some)
460 } else if self.enable_oss {
461 self.build_oss().map(Some)
462 } else if self.enable_gcs {
463 self.build_gcs().map(Some)
464 } else if self.enable_azblob {
465 self.build_azblob().map(Some)
466 } else {
467 Ok(None)
468 }
469 }
470}