Skip to main content

cli/common/
object_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_base::secrets::{ExposeSecret, SecretString};
16use common_error::ext::BoxedError;
17use object_store::services::{Azblob, Fs, Gcs, Oss, S3};
18use object_store::util::{with_instrument_layers, with_retry_layers};
19use object_store::{AzblobConnection, GcsConnection, ObjectStore, OssConnection, S3Connection};
20use paste::paste;
21use snafu::ResultExt;
22
23use crate::error::{self};
24
25/// Trait to convert CLI field types to target struct field types.
26/// This enables `Option<SecretString>` (CLI) -> `SecretString` (target) conversions,
27/// allowing us to distinguish "not provided" from "provided but empty".
28trait IntoField<T> {
29    fn into_field(self) -> T;
30}
31
32/// Identity conversion for types that are the same.
33impl<T> IntoField<T> for T {
34    fn into_field(self) -> T {
35        self
36    }
37}
38
39/// Convert `Option<SecretString>` to `SecretString`, using default for None.
40impl IntoField<SecretString> for Option<SecretString> {
41    fn into_field(self) -> SecretString {
42        self.unwrap_or_default()
43    }
44}
45
46/// Trait for checking if a field is effectively empty.
47///
48/// **`is_empty()`**: Checks if the field has no meaningful value
49/// - Used when backend is enabled to validate required fields
50/// - `None`, `Some("")`, `false`, or `""` are considered empty
51trait FieldValidator {
52    /// Check if the field is empty (has no meaningful value).
53    fn is_empty(&self) -> bool;
54}
55
56/// String fields: empty if the string is empty
57impl FieldValidator for String {
58    fn is_empty(&self) -> bool {
59        self.is_empty()
60    }
61}
62
63/// Bool fields: false is considered "empty", true is "provided"
64impl FieldValidator for bool {
65    fn is_empty(&self) -> bool {
66        !self
67    }
68}
69
70/// Option<String> fields: None or empty content is empty
71impl FieldValidator for Option<String> {
72    fn is_empty(&self) -> bool {
73        self.as_ref().is_none_or(|s| s.is_empty())
74    }
75}
76
77/// Option<SecretString> fields: None or empty secret is empty
78/// For secrets, Some("") is treated as "not provided" for both checks
79impl FieldValidator for Option<SecretString> {
80    fn is_empty(&self) -> bool {
81        self.as_ref().is_none_or(|s| s.expose_secret().is_empty())
82    }
83}
84
85macro_rules! wrap_with_clap_prefix {
86    (
87        $new_name:ident, $prefix:literal, $enable_flag:literal, $base:ty, {
88            $( $( #[doc = $doc:expr] )? $( #[alias = $alias:literal] )? $( #[hide = $hide:literal] )? $field:ident : $type:ty $( = $default:expr )? ),* $(,)?
89        }
90    ) => {
91        paste!{
92            #[derive(clap::Parser, Debug, Clone, PartialEq, Default)]
93            pub struct $new_name {
94                $(
95                    $( #[doc = $doc] )?
96                    $( #[clap(alias = $alias)] )?
97                    $( #[clap(hide = $hide)] )?
98                    #[clap(long, requires = $enable_flag $(, default_value_t = $default )? )]
99                    pub [<$prefix $field>]: $type,
100                )*
101            }
102
103            impl From<$new_name> for $base {
104                fn from(w: $new_name) -> Self {
105                    Self {
106                        // Use into_field() to handle Option<SecretString> -> SecretString conversion
107                        $( $field: w.[<$prefix $field>].into_field() ),*
108                    }
109                }
110            }
111        }
112    };
113}
114
115/// Macro for declarative backend validation.
116///
117/// # Validation Rules
118///
119/// For each storage backend (S3, OSS, GCS, Azblob), this function validates:
120/// **When backend is enabled** (e.g., `--s3`): All required fields must be non-empty
121///
122/// Note: When backend is disabled, clap's `requires` attribute ensures no configuration
123/// fields can be provided at parse time.
124///
125/// # Syntax
126///
127/// ```ignore
128/// validate_backend!(
129///     enable: self.enable_s3,
130///     name: "S3",
131///     required: [(field1, "name1"), (field2, "name2"), ...],
132///     custom_validator: |missing| { ... }  // optional
133/// )
134/// ```
135///
136/// # Arguments
137///
138/// - `enable`: Boolean expression indicating if backend is enabled
139/// - `name`: Human-readable backend name for error messages
140/// - `required`: Array of (field_ref, field_name) tuples for required fields
141/// - `custom_validator`: Optional closure for complex validation logic
142///
143/// # Example
144///
145/// ```ignore
146/// validate_backend!(
147///     enable: self.enable_s3,
148///     name: "S3",
149///     required: [
150///         (&self.s3.s3_bucket, "bucket"),
151///         (&self.s3.s3_access_key_id, "access key ID"),
152///     ]
153/// )
154/// ```
155macro_rules! validate_backend {
156    (
157        enable: $enable:expr,
158        name: $backend_name:expr,
159        required: [ $( ($field:expr, $field_name:expr) ),* $(,)? ]
160        $(, custom_validator: $custom_validator:expr)?
161    ) => {{
162        if $enable {
163            // Check required fields when backend is enabled
164            let mut missing = Vec::new();
165            $(
166                if FieldValidator::is_empty($field) {
167                    missing.push($field_name);
168                }
169            )*
170
171            // Run custom validation if provided
172            $(
173                $custom_validator(&mut missing);
174            )?
175
176            if !missing.is_empty() {
177                return Err(BoxedError::new(
178                    error::MissingConfigSnafu {
179                        msg: format!(
180                            "{} {} must be set when --{} is enabled.",
181                            $backend_name,
182                            missing.join(", "),
183                            $backend_name.to_lowercase()
184                        ),
185                    }
186                    .build(),
187                ));
188            }
189        }
190
191        Ok(())
192    }};
193}
194
195wrap_with_clap_prefix! {
196    PrefixedAzblobConnection,
197    "azblob-",
198    "enable_azblob",
199    AzblobConnection,
200    {
201        #[doc = "The container of the object store."]
202        container: String = Default::default(),
203        #[doc = "The root of the object store."]
204        root: String = Default::default(),
205        #[doc = "The account name of the object store."]
206        account_name: Option<SecretString>,
207        #[doc = "The account key of the object store."]
208        account_key: Option<SecretString>,
209        #[doc = "The endpoint of the object store."]
210        endpoint: String = Default::default(),
211        #[doc = "The SAS token of the object store."]
212        sas_token: Option<String>,
213    }
214}
215
216impl PrefixedAzblobConnection {
217    pub fn validate(&self) -> Result<(), BoxedError> {
218        validate_backend!(
219            enable: true,
220            name: "AzBlob",
221            required: [
222                (&self.azblob_container, "container"),
223                (&self.azblob_root, "root"),
224                (&self.azblob_account_name, "account name"),
225                (&self.azblob_endpoint, "endpoint"),
226            ],
227            custom_validator: |missing: &mut Vec<&str>| {
228                // account_key is only required if sas_token is not provided
229                if self.azblob_sas_token.is_none()
230                    && self.azblob_account_key.is_empty()
231                {
232                    missing.push("account key (when sas_token is not provided)");
233                }
234            }
235        )
236    }
237}
238
239wrap_with_clap_prefix! {
240    PrefixedS3Connection,
241    "s3-",
242    "enable_s3",
243    S3Connection,
244    {
245        #[doc = "The bucket of the object store."]
246        bucket: String = Default::default(),
247        #[doc = "The root of the object store."]
248        root: String = Default::default(),
249        #[doc = "The access key ID of the object store."]
250        access_key_id: Option<SecretString>,
251        #[doc = "The secret access key of the object store."]
252        secret_access_key: Option<SecretString>,
253        #[doc = "The endpoint of the object store."]
254        endpoint: Option<String>,
255        #[doc = "The region of the object store."]
256        region: Option<String>,
257        #[doc = "Enable virtual host style for the object store."]
258        enable_virtual_host_style: bool = Default::default(),
259        #[doc = "Disable EC2 metadata service for the object store."]
260        disable_ec2_metadata: bool = Default::default(),
261    }
262}
263
264impl PrefixedS3Connection {
265    pub fn validate(&self) -> Result<(), BoxedError> {
266        validate_backend!(
267            enable: true,
268            name: "S3",
269            required: [
270                (&self.s3_bucket, "bucket"),
271                (&self.s3_region, "region"),
272            ]
273        )
274    }
275}
276
277wrap_with_clap_prefix! {
278    PrefixedOssConnection,
279    "oss-",
280    "enable_oss",
281    OssConnection,
282    {
283        #[doc = "The bucket of the object store."]
284        bucket: String = Default::default(),
285        #[doc = "The root of the object store."]
286        root: String = Default::default(),
287        #[doc = "The access key ID of the object store."]
288        access_key_id: Option<SecretString>,
289        #[doc = "The access key secret of the object store."]
290        access_key_secret: Option<SecretString>,
291        #[doc = "The endpoint of the object store."]
292        endpoint: String = Default::default(),
293    }
294}
295
296impl PrefixedOssConnection {
297    pub fn validate(&self) -> Result<(), BoxedError> {
298        validate_backend!(
299            enable: true,
300            name: "OSS",
301            required: [
302                (&self.oss_bucket, "bucket"),
303                (&self.oss_access_key_id, "access key ID"),
304                (&self.oss_access_key_secret, "access key secret"),
305                (&self.oss_endpoint, "endpoint"),
306            ]
307        )
308    }
309}
310
311wrap_with_clap_prefix! {
312    PrefixedGcsConnection,
313    "gcs-",
314    "enable_gcs",
315    GcsConnection,
316    {
317        #[doc = "The root of the object store."]
318        root: String = Default::default(),
319        #[doc = "The bucket of the object store."]
320        bucket: String = Default::default(),
321        #[doc = "The scope of the object store."]
322        scope: String = Default::default(),
323        #[doc = "The credential path of the object store."]
324        #[hide = true]
325        credential_path: Option<SecretString>,
326        #[doc = "The credential of the object store."]
327        credential: Option<SecretString>,
328        #[doc = "The endpoint of the object store."]
329        endpoint: String = Default::default(),
330    }
331}
332
333impl PrefixedGcsConnection {
334    pub fn validate(&self) -> Result<(), BoxedError> {
335        validate_backend!(
336            enable: true,
337            name: "GCS",
338            required: [
339                (&self.gcs_bucket, "bucket"),
340                (&self.gcs_root, "root"),
341                (&self.gcs_scope, "scope"),
342            ]
343            // No custom_validator needed: GCS supports Application Default Credentials (ADC)
344            // where neither credential_path nor credential is required.
345            // Endpoint is also optional (defaults to https://storage.googleapis.com).
346        )
347    }
348}
349
350/// Common config for object store.
351///
352/// # Dependency Enforcement
353///
354/// Each backend's configuration fields (e.g., `--s3-bucket`) requires its corresponding
355/// enable flag (e.g., `--s3`) to be present. This is enforced by `clap` at parse time
356/// using the `requires` attribute.
357///
358/// For example, attempting to use `--s3-bucket my-bucket` without `--s3` will result in:
359/// ```text
360/// error: The argument '--s3-bucket <BUCKET>' requires '--s3'
361/// ```
362///
363/// This ensures that users cannot accidentally provide backend-specific configuration
364/// without explicitly enabling that backend.
365#[derive(clap::Parser, Debug, Clone, PartialEq, Default)]
366#[clap(group(clap::ArgGroup::new("storage_backend").required(false).multiple(false)))]
367pub struct ObjectStoreConfig {
368    /// Whether to use S3 object store.
369    #[clap(long = "s3", group = "storage_backend")]
370    pub enable_s3: bool,
371
372    #[clap(flatten)]
373    pub s3: PrefixedS3Connection,
374
375    /// Whether to use OSS.
376    #[clap(long = "oss", group = "storage_backend")]
377    pub enable_oss: bool,
378
379    #[clap(flatten)]
380    pub oss: PrefixedOssConnection,
381
382    /// Whether to use GCS.
383    #[clap(long = "gcs", group = "storage_backend")]
384    pub enable_gcs: bool,
385
386    #[clap(flatten)]
387    pub gcs: PrefixedGcsConnection,
388
389    /// Whether to use Azure Blob.
390    #[clap(long = "azblob", group = "storage_backend")]
391    pub enable_azblob: bool,
392
393    #[clap(flatten)]
394    pub azblob: PrefixedAzblobConnection,
395}
396
397/// Creates a new file system object store.
398pub fn new_fs_object_store(root: &str) -> std::result::Result<ObjectStore, BoxedError> {
399    let builder = Fs::default().root(root);
400    let object_store = ObjectStore::new(builder)
401        .context(error::InitBackendSnafu)
402        .map_err(BoxedError::new)?
403        .finish();
404
405    Ok(with_instrument_layers(object_store, false))
406}
407
408macro_rules! gen_object_store_builder {
409    ($method:ident, $field:ident, $conn_type:ty, $service_type:ty) => {
410        pub fn $method(&self) -> Result<ObjectStore, BoxedError> {
411            let config = <$conn_type>::from(self.$field.clone());
412            common_telemetry::info!(
413                "Building object store with {}: {:?}",
414                stringify!($field),
415                config
416            );
417            let object_store = ObjectStore::new(<$service_type>::from(&config))
418                .context(error::InitBackendSnafu)
419                .map_err(BoxedError::new)?
420                .finish();
421            Ok(with_instrument_layers(
422                with_retry_layers(object_store),
423                false,
424            ))
425        }
426    };
427}
428
429impl ObjectStoreConfig {
430    gen_object_store_builder!(build_s3, s3, S3Connection, S3);
431
432    gen_object_store_builder!(build_oss, oss, OssConnection, Oss);
433
434    gen_object_store_builder!(build_gcs, gcs, GcsConnection, Gcs);
435
436    gen_object_store_builder!(build_azblob, azblob, AzblobConnection, Azblob);
437
438    pub fn validate(&self) -> Result<(), BoxedError> {
439        if self.enable_s3 {
440            self.s3.validate()?;
441        }
442        if self.enable_oss {
443            self.oss.validate()?;
444        }
445        if self.enable_gcs {
446            self.gcs.validate()?;
447        }
448        if self.enable_azblob {
449            self.azblob.validate()?;
450        }
451        Ok(())
452    }
453
454    /// Builds the object store from the config.
455    pub fn build(&self) -> Result<Option<ObjectStore>, BoxedError> {
456        self.validate()?;
457
458        if self.enable_s3 {
459            self.build_s3().map(Some)
460        } else if self.enable_oss {
461            self.build_oss().map(Some)
462        } else if self.enable_gcs {
463            self.build_gcs().map(Some)
464        } else if self.enable_azblob {
465            self.build_azblob().map(Some)
466        } else {
467            Ok(None)
468        }
469    }
470}