1use common_base::secrets::{ExposeSecret, SecretString};
16use common_error::ext::BoxedError;
17use object_store::services::{Azblob, Fs, Gcs, Oss, S3};
18use object_store::util::{with_instrument_layers, with_retry_layers};
19use object_store::{AzblobConnection, GcsConnection, ObjectStore, OssConnection, S3Connection};
20use paste::paste;
21use snafu::ResultExt;
22
23use crate::error::{self};
24
25trait IntoField<T> {
29 fn into_field(self) -> T;
30}
31
32impl<T> IntoField<T> for T {
34 fn into_field(self) -> T {
35 self
36 }
37}
38
39impl IntoField<SecretString> for Option<SecretString> {
41 fn into_field(self) -> SecretString {
42 self.unwrap_or_default()
43 }
44}
45
46trait FieldValidator {
52 fn is_empty(&self) -> bool;
54}
55
56impl FieldValidator for String {
58 fn is_empty(&self) -> bool {
59 self.is_empty()
60 }
61}
62
63impl FieldValidator for bool {
65 fn is_empty(&self) -> bool {
66 !self
67 }
68}
69
70impl FieldValidator for Option<String> {
72 fn is_empty(&self) -> bool {
73 self.as_ref().is_none_or(|s| s.is_empty())
74 }
75}
76
77impl FieldValidator for Option<SecretString> {
80 fn is_empty(&self) -> bool {
81 self.as_ref().is_none_or(|s| s.expose_secret().is_empty())
82 }
83}
84
85macro_rules! wrap_with_clap_prefix {
86 (
87 $new_name:ident, $prefix:literal, $enable_flag:literal, $base:ty, {
88 $( $( #[doc = $doc:expr] )? $( #[alias = $alias:literal] )? $( #[hide = $hide:literal] )? $field:ident : $type:ty $( = $default:expr )? ),* $(,)?
89 }
90 ) => {
91 paste!{
92 #[derive(clap::Parser, Debug, Clone, PartialEq, Default)]
93 pub struct $new_name {
94 $(
95 $( #[doc = $doc] )?
96 $( #[clap(alias = $alias)] )?
97 $( #[clap(hide = $hide)] )?
98 #[clap(long, requires = $enable_flag $(, default_value_t = $default )? )]
99 pub [<$prefix $field>]: $type,
100 )*
101 }
102
103 impl From<$new_name> for $base {
104 fn from(w: $new_name) -> Self {
105 Self {
106 $( $field: w.[<$prefix $field>].into_field() ),*
108 }
109 }
110 }
111 }
112 };
113}
114
115macro_rules! validate_backend {
156 (
157 enable: $enable:expr,
158 name: $backend_name:expr,
159 required: [ $( ($field:expr, $field_name:expr) ),* $(,)? ]
160 $(, custom_validator: $custom_validator:expr)?
161 ) => {{
162 if $enable {
163 let mut missing = Vec::new();
165 $(
166 if FieldValidator::is_empty($field) {
167 missing.push($field_name);
168 }
169 )*
170
171 $(
173 $custom_validator(&mut missing);
174 )?
175
176 if !missing.is_empty() {
177 return Err(BoxedError::new(
178 error::MissingConfigSnafu {
179 msg: format!(
180 "{} {} must be set when --{} is enabled.",
181 $backend_name,
182 missing.join(", "),
183 $backend_name.to_lowercase()
184 ),
185 }
186 .build(),
187 ));
188 }
189 }
190
191 Ok(())
192 }};
193}
194
195wrap_with_clap_prefix! {
196 PrefixedAzblobConnection,
197 "azblob-",
198 "enable_azblob",
199 AzblobConnection,
200 {
201 #[doc = "The container of the object store."]
202 container: String = Default::default(),
203 #[doc = "The root of the object store."]
204 root: String = Default::default(),
205 #[doc = "The account name of the object store."]
206 account_name: Option<SecretString>,
207 #[doc = "The account key of the object store."]
208 account_key: Option<SecretString>,
209 #[doc = "The endpoint of the object store."]
210 endpoint: String = Default::default(),
211 #[doc = "The SAS token of the object store."]
212 sas_token: Option<String>,
213 }
214}
215
216impl PrefixedAzblobConnection {
217 pub fn validate(&self) -> Result<(), BoxedError> {
218 validate_backend!(
219 enable: true,
220 name: "AzBlob",
221 required: [
222 (&self.azblob_container, "container"),
223 (&self.azblob_endpoint, "endpoint"),
224 ]
225 )
226 }
227}
228
229wrap_with_clap_prefix! {
230 PrefixedS3Connection,
231 "s3-",
232 "enable_s3",
233 S3Connection,
234 {
235 #[doc = "The bucket of the object store."]
236 bucket: String = Default::default(),
237 #[doc = "The root of the object store."]
238 root: String = Default::default(),
239 #[doc = "The access key ID of the object store."]
240 access_key_id: Option<SecretString>,
241 #[doc = "The secret access key of the object store."]
242 secret_access_key: Option<SecretString>,
243 #[doc = "The endpoint of the object store."]
244 endpoint: Option<String>,
245 #[doc = "The region of the object store."]
246 region: Option<String>,
247 #[doc = "Enable virtual host style for the object store."]
248 enable_virtual_host_style: bool = Default::default(),
249 #[doc = "Disable EC2 metadata service for the object store."]
250 disable_ec2_metadata: bool = Default::default(),
251 }
252}
253
254impl PrefixedS3Connection {
255 pub fn validate(&self) -> Result<(), BoxedError> {
256 validate_backend!(
257 enable: true,
258 name: "S3",
259 required: [
260 (&self.s3_bucket, "bucket"),
261 (&self.s3_region, "region"),
262 ]
263 )
264 }
265}
266
267wrap_with_clap_prefix! {
268 PrefixedOssConnection,
269 "oss-",
270 "enable_oss",
271 OssConnection,
272 {
273 #[doc = "The bucket of the object store."]
274 bucket: String = Default::default(),
275 #[doc = "The root of the object store."]
276 root: String = Default::default(),
277 #[doc = "The access key ID of the object store."]
278 access_key_id: Option<SecretString>,
279 #[doc = "The access key secret of the object store."]
280 access_key_secret: Option<SecretString>,
281 #[doc = "The endpoint of the object store."]
282 endpoint: String = Default::default(),
283 }
284}
285
286impl PrefixedOssConnection {
287 pub fn validate(&self) -> Result<(), BoxedError> {
288 validate_backend!(
289 enable: true,
290 name: "OSS",
291 required: [
292 (&self.oss_bucket, "bucket"),
293 (&self.oss_access_key_id, "access key ID"),
294 (&self.oss_access_key_secret, "access key secret"),
295 (&self.oss_endpoint, "endpoint"),
296 ]
297 )
298 }
299}
300
301wrap_with_clap_prefix! {
302 PrefixedGcsConnection,
303 "gcs-",
304 "enable_gcs",
305 GcsConnection,
306 {
307 #[doc = "The root of the object store."]
308 root: String = Default::default(),
309 #[doc = "The bucket of the object store."]
310 bucket: String = Default::default(),
311 #[doc = "The scope of the object store."]
312 scope: String = Default::default(),
313 #[doc = "The credential path of the object store."]
314 #[hide = true]
315 credential_path: Option<SecretString>,
316 #[doc = "The credential of the object store."]
317 credential: Option<SecretString>,
318 #[doc = "The endpoint of the object store."]
319 endpoint: String = Default::default(),
320 }
321}
322
323impl PrefixedGcsConnection {
324 pub fn validate(&self) -> Result<(), BoxedError> {
325 validate_backend!(
326 enable: true,
327 name: "GCS",
328 required: [
329 (&self.gcs_bucket, "bucket"),
330 (&self.gcs_root, "root"),
331 (&self.gcs_scope, "scope"),
332 ]
333 )
337 }
338}
339
340#[derive(clap::Parser, Debug, Clone, PartialEq, Default)]
356#[clap(group(clap::ArgGroup::new("storage_backend").required(false).multiple(false)))]
357pub struct ObjectStoreConfig {
358 #[clap(long = "s3", group = "storage_backend")]
360 pub enable_s3: bool,
361
362 #[clap(flatten)]
363 pub s3: PrefixedS3Connection,
364
365 #[clap(long = "oss", group = "storage_backend")]
367 pub enable_oss: bool,
368
369 #[clap(flatten)]
370 pub oss: PrefixedOssConnection,
371
372 #[clap(long = "gcs", group = "storage_backend")]
374 pub enable_gcs: bool,
375
376 #[clap(flatten)]
377 pub gcs: PrefixedGcsConnection,
378
379 #[clap(long = "azblob", group = "storage_backend")]
381 pub enable_azblob: bool,
382
383 #[clap(flatten)]
384 pub azblob: PrefixedAzblobConnection,
385}
386
387pub fn new_fs_object_store(root: &str) -> std::result::Result<ObjectStore, BoxedError> {
389 let builder = Fs::default().root(root);
390 let object_store = ObjectStore::new(builder)
391 .context(error::InitBackendSnafu)
392 .map_err(BoxedError::new)?
393 .finish();
394
395 Ok(with_instrument_layers(object_store, false))
396}
397
398macro_rules! gen_object_store_builder {
399 ($method:ident, $field:ident, $conn_type:ty, $service_type:ty) => {
400 pub fn $method(&self) -> Result<ObjectStore, BoxedError> {
401 let config = <$conn_type>::from(self.$field.clone());
402 common_telemetry::info!(
403 "Building object store with {}: {:?}",
404 stringify!($field),
405 config
406 );
407 let object_store = ObjectStore::new(<$service_type>::from(&config))
408 .context(error::InitBackendSnafu)
409 .map_err(BoxedError::new)?
410 .finish();
411 Ok(with_instrument_layers(
412 with_retry_layers(object_store),
413 false,
414 ))
415 }
416 };
417}
418
419impl ObjectStoreConfig {
420 gen_object_store_builder!(build_s3, s3, S3Connection, S3);
421
422 gen_object_store_builder!(build_oss, oss, OssConnection, Oss);
423
424 gen_object_store_builder!(build_gcs, gcs, GcsConnection, Gcs);
425
426 gen_object_store_builder!(build_azblob, azblob, AzblobConnection, Azblob);
427
428 pub fn validate(&self) -> Result<(), BoxedError> {
429 if self.enable_s3 {
430 self.s3.validate()?;
431 }
432 if self.enable_oss {
433 self.oss.validate()?;
434 }
435 if self.enable_gcs {
436 self.gcs.validate()?;
437 }
438 if self.enable_azblob {
439 self.azblob.validate()?;
440 }
441 Ok(())
442 }
443
444 pub fn build(&self) -> Result<Option<ObjectStore>, BoxedError> {
446 self.validate()?;
447
448 if self.enable_s3 {
449 self.build_s3().map(Some)
450 } else if self.enable_oss {
451 self.build_oss().map(Some)
452 } else if self.enable_gcs {
453 self.build_gcs().map(Some)
454 } else if self.enable_azblob {
455 self.build_azblob().map(Some)
456 } else {
457 Ok(None)
458 }
459 }
460}