1use common_base::secrets::{ExposeSecret, SecretString};
16use common_error::ext::BoxedError;
17use object_store::services::{Azblob, Fs, Gcs, Oss, S3};
18use object_store::util::{with_instrument_layers, with_retry_layers};
19use object_store::{AzblobConnection, GcsConnection, ObjectStore, OssConnection, S3Connection};
20use paste::paste;
21use snafu::ResultExt;
22
23use crate::error::{self};
24
25trait IntoField<T> {
29 fn into_field(self) -> T;
30}
31
32impl<T> IntoField<T> for T {
34 fn into_field(self) -> T {
35 self
36 }
37}
38
39impl IntoField<SecretString> for Option<SecretString> {
41 fn into_field(self) -> SecretString {
42 self.unwrap_or_default()
43 }
44}
45
46trait FieldValidator {
52 fn is_empty(&self) -> bool;
54}
55
56impl FieldValidator for String {
58 fn is_empty(&self) -> bool {
59 self.is_empty()
60 }
61}
62
63impl FieldValidator for bool {
65 fn is_empty(&self) -> bool {
66 !self
67 }
68}
69
70impl FieldValidator for Option<String> {
72 fn is_empty(&self) -> bool {
73 self.as_ref().is_none_or(|s| s.is_empty())
74 }
75}
76
77impl FieldValidator for Option<SecretString> {
80 fn is_empty(&self) -> bool {
81 self.as_ref().is_none_or(|s| s.expose_secret().is_empty())
82 }
83}
84
85macro_rules! wrap_with_clap_prefix {
86 (
87 $new_name:ident, $prefix:literal, $enable_flag:literal, $base:ty, {
88 $( $( #[doc = $doc:expr] )? $( #[alias = $alias:literal] )? $field:ident : $type:ty $( = $default:expr )? ),* $(,)?
89 }
90 ) => {
91 paste!{
92 #[derive(clap::Parser, Debug, Clone, PartialEq, Default)]
93 pub struct $new_name {
94 $(
95 $( #[doc = $doc] )?
96 $( #[clap(alias = $alias)] )?
97 #[clap(long, requires = $enable_flag $(, default_value_t = $default )? )]
98 pub [<$prefix $field>]: $type,
99 )*
100 }
101
102 impl From<$new_name> for $base {
103 fn from(w: $new_name) -> Self {
104 Self {
105 $( $field: w.[<$prefix $field>].into_field() ),*
107 }
108 }
109 }
110 }
111 };
112}
113
114macro_rules! validate_backend {
155 (
156 enable: $enable:expr,
157 name: $backend_name:expr,
158 required: [ $( ($field:expr, $field_name:expr) ),* $(,)? ]
159 $(, custom_validator: $custom_validator:expr)?
160 ) => {{
161 if $enable {
162 let mut missing = Vec::new();
164 $(
165 if FieldValidator::is_empty($field) {
166 missing.push($field_name);
167 }
168 )*
169
170 $(
172 $custom_validator(&mut missing);
173 )?
174
175 if !missing.is_empty() {
176 return Err(BoxedError::new(
177 error::MissingConfigSnafu {
178 msg: format!(
179 "{} {} must be set when --{} is enabled.",
180 $backend_name,
181 missing.join(", "),
182 $backend_name.to_lowercase()
183 ),
184 }
185 .build(),
186 ));
187 }
188 }
189
190 Ok(())
191 }};
192}
193
194wrap_with_clap_prefix! {
195 PrefixedAzblobConnection,
196 "azblob-",
197 "enable_azblob",
198 AzblobConnection,
199 {
200 #[doc = "The container of the object store."]
201 container: String = Default::default(),
202 #[doc = "The root of the object store."]
203 root: String = Default::default(),
204 #[doc = "The account name of the object store."]
205 account_name: Option<SecretString>,
206 #[doc = "The account key of the object store."]
207 account_key: Option<SecretString>,
208 #[doc = "The endpoint of the object store."]
209 endpoint: String = Default::default(),
210 #[doc = "The SAS token of the object store."]
211 sas_token: Option<String>,
212 }
213}
214
215impl PrefixedAzblobConnection {
216 pub fn validate(&self) -> Result<(), BoxedError> {
217 validate_backend!(
218 enable: true,
219 name: "AzBlob",
220 required: [
221 (&self.azblob_container, "container"),
222 (&self.azblob_root, "root"),
223 (&self.azblob_account_name, "account name"),
224 (&self.azblob_endpoint, "endpoint"),
225 ],
226 custom_validator: |missing: &mut Vec<&str>| {
227 if self.azblob_sas_token.is_none()
229 && self.azblob_account_key.is_empty()
230 {
231 missing.push("account key (when sas_token is not provided)");
232 }
233 }
234 )
235 }
236}
237
238wrap_with_clap_prefix! {
239 PrefixedS3Connection,
240 "s3-",
241 "enable_s3",
242 S3Connection,
243 {
244 #[doc = "The bucket of the object store."]
245 bucket: String = Default::default(),
246 #[doc = "The root of the object store."]
247 root: String = Default::default(),
248 #[doc = "The access key ID of the object store."]
249 access_key_id: Option<SecretString>,
250 #[doc = "The secret access key of the object store."]
251 secret_access_key: Option<SecretString>,
252 #[doc = "The endpoint of the object store."]
253 endpoint: Option<String>,
254 #[doc = "The region of the object store."]
255 region: Option<String>,
256 #[doc = "Enable virtual host style for the object store."]
257 enable_virtual_host_style: bool = Default::default(),
258 #[doc = "Disable EC2 metadata service for the object store."]
259 disable_ec2_metadata: bool = Default::default(),
260 }
261}
262
263impl PrefixedS3Connection {
264 pub fn validate(&self) -> Result<(), BoxedError> {
265 validate_backend!(
266 enable: true,
267 name: "S3",
268 required: [
269 (&self.s3_bucket, "bucket"),
270 (&self.s3_region, "region"),
271 ]
272 )
273 }
274}
275
276wrap_with_clap_prefix! {
277 PrefixedOssConnection,
278 "oss-",
279 "enable_oss",
280 OssConnection,
281 {
282 #[doc = "The bucket of the object store."]
283 bucket: String = Default::default(),
284 #[doc = "The root of the object store."]
285 root: String = Default::default(),
286 #[doc = "The access key ID of the object store."]
287 access_key_id: Option<SecretString>,
288 #[doc = "The access key secret of the object store."]
289 access_key_secret: Option<SecretString>,
290 #[doc = "The endpoint of the object store."]
291 endpoint: String = Default::default(),
292 }
293}
294
295impl PrefixedOssConnection {
296 pub fn validate(&self) -> Result<(), BoxedError> {
297 validate_backend!(
298 enable: true,
299 name: "OSS",
300 required: [
301 (&self.oss_bucket, "bucket"),
302 (&self.oss_access_key_id, "access key ID"),
303 (&self.oss_access_key_secret, "access key secret"),
304 (&self.oss_endpoint, "endpoint"),
305 ]
306 )
307 }
308}
309
310wrap_with_clap_prefix! {
311 PrefixedGcsConnection,
312 "gcs-",
313 "enable_gcs",
314 GcsConnection,
315 {
316 #[doc = "The root of the object store."]
317 root: String = Default::default(),
318 #[doc = "The bucket of the object store."]
319 bucket: String = Default::default(),
320 #[doc = "The scope of the object store."]
321 scope: String = Default::default(),
322 #[doc = "The credential path of the object store."]
323 credential_path: Option<SecretString>,
324 #[doc = "The credential of the object store."]
325 credential: Option<SecretString>,
326 #[doc = "The endpoint of the object store."]
327 endpoint: String = Default::default(),
328 }
329}
330
331impl PrefixedGcsConnection {
332 pub fn validate(&self) -> Result<(), BoxedError> {
333 validate_backend!(
334 enable: true,
335 name: "GCS",
336 required: [
337 (&self.gcs_bucket, "bucket"),
338 (&self.gcs_root, "root"),
339 (&self.gcs_scope, "scope"),
340 ]
341 )
345 }
346}
347
348#[derive(clap::Parser, Debug, Clone, PartialEq, Default)]
364#[clap(group(clap::ArgGroup::new("storage_backend").required(false).multiple(false)))]
365pub struct ObjectStoreConfig {
366 #[clap(long = "s3", group = "storage_backend")]
368 pub enable_s3: bool,
369
370 #[clap(flatten)]
371 pub s3: PrefixedS3Connection,
372
373 #[clap(long = "oss", group = "storage_backend")]
375 pub enable_oss: bool,
376
377 #[clap(flatten)]
378 pub oss: PrefixedOssConnection,
379
380 #[clap(long = "gcs", group = "storage_backend")]
382 pub enable_gcs: bool,
383
384 #[clap(flatten)]
385 pub gcs: PrefixedGcsConnection,
386
387 #[clap(long = "azblob", group = "storage_backend")]
389 pub enable_azblob: bool,
390
391 #[clap(flatten)]
392 pub azblob: PrefixedAzblobConnection,
393}
394
395pub fn new_fs_object_store(root: &str) -> std::result::Result<ObjectStore, BoxedError> {
397 let builder = Fs::default().root(root);
398 let object_store = ObjectStore::new(builder)
399 .context(error::InitBackendSnafu)
400 .map_err(BoxedError::new)?
401 .finish();
402
403 Ok(with_instrument_layers(object_store, false))
404}
405
406macro_rules! gen_object_store_builder {
407 ($method:ident, $field:ident, $conn_type:ty, $service_type:ty) => {
408 pub fn $method(&self) -> Result<ObjectStore, BoxedError> {
409 let config = <$conn_type>::from(self.$field.clone());
410 common_telemetry::info!(
411 "Building object store with {}: {:?}",
412 stringify!($field),
413 config
414 );
415 let object_store = ObjectStore::new(<$service_type>::from(&config))
416 .context(error::InitBackendSnafu)
417 .map_err(BoxedError::new)?
418 .finish();
419 Ok(with_instrument_layers(
420 with_retry_layers(object_store),
421 false,
422 ))
423 }
424 };
425}
426
427impl ObjectStoreConfig {
428 gen_object_store_builder!(build_s3, s3, S3Connection, S3);
429
430 gen_object_store_builder!(build_oss, oss, OssConnection, Oss);
431
432 gen_object_store_builder!(build_gcs, gcs, GcsConnection, Gcs);
433
434 gen_object_store_builder!(build_azblob, azblob, AzblobConnection, Azblob);
435
436 pub fn validate(&self) -> Result<(), BoxedError> {
437 if self.enable_s3 {
438 self.s3.validate()?;
439 }
440 if self.enable_oss {
441 self.oss.validate()?;
442 }
443 if self.enable_gcs {
444 self.gcs.validate()?;
445 }
446 if self.enable_azblob {
447 self.azblob.validate()?;
448 }
449 Ok(())
450 }
451
452 pub fn build(&self) -> Result<Option<ObjectStore>, BoxedError> {
454 self.validate()?;
455
456 if self.enable_s3 {
457 self.build_s3().map(Some)
458 } else if self.enable_oss {
459 self.build_oss().map(Some)
460 } else if self.enable_gcs {
461 self.build_gcs().map(Some)
462 } else if self.enable_azblob {
463 self.build_azblob().map(Some)
464 } else {
465 Ok(None)
466 }
467 }
468}